Computational Go
Introduction¶
If queues are the arteries of the stream system, then computational nodes are the lungs and beating heart.
Golang computational nodes result in fast, compiled binaries that can execute an array of different operations on your streaming data.
Creating the Node¶
The method of creating computational nodes is through Stream files.
When a computational node named <node name> is created, the following actions are performed:
- A directory is created named lowercase <node name> in the <playpen>/src/nodes directory.
- A main program file called <node name>.go is created in the <playpen>/src/nodes/<node name> directory.
- A user-modifiable program file called <node-name>U.go is created in the <playpen>/src/nodes/<node-name> directory. If this file already exists, it will not be overwritten unless the force option is used in the creation of the node.
- The node is registered to the mid-tier as being associated with the current playpen.
Customizations of the computational node should be done only on the *U.go file.
Within this file, there are three main sections, or blocks:
init, worker, and term. There are also two additional sections; initThreads
and termThreads.
- The
initblock is run once per thread before the worker block, and is where initializations should be placed. - The
workerblock is run for every observation in the input data, and follows a SAS DATA step-like process. - The
termblock is run at the end of the *U.go file, once per thread. - The
initThreadsfunction is run once before any threads - The
termThreadsfunction is run when all the threads complete.
Tip: For more details on the structure of *U.go files, see the *U.go File Structure section.
No additional code is required in any of these functions. It is recommended that an IDE be used for editing and debugging this code.
Details on the init function¶
Typical operations performed in the init function are:
- Initialization of user added fields in the
Userstructure - Reading from static files
- Processing in-memory queues
- Sending or recieving dynamic facts
The User structure instance is unique to each thread so fields added to this
structure are thread safe. The init function will be called once for
each thread requested for the node. When the init function is called,
the output queues are open and
available for posting data.
Details on the worker function¶
Typical operations performed in the worker function are:
- Processing of data from an input queue, one observation at a time
- Executing a model
- Posting new data to one or more output queues
- Rewinding a queue that is using a group key
Details on the term function¶
Typical operations performed in the term function are:
- Posting aggregate values
- Sending dynamic facts
When the term function is called,
the output queues are still open and
available for posting data and all the input queues
are empty (the data has been consumed).
Details on the initThreads function¶
The initThreads function can be used when the node
is multi-threaded. When initThreads is called,
in-memory queues have been created but no threads
have been started and the output queues are not
available for writing. Since the User structure is thread
specific (one per thread), any global data created
in initThreads should be stored in u.hh.User
which is of type any. The u.hh is shared between
threads. In Golang, reading from a structure is
threadsafe so no synchronizations are required for
reading. Writing to a structure shared across threads
does require synchronization as does looping over
a map. The initThreads function is called only
once, so operations in this function don't need
to be synchronized with other threads.
Details on the termThreads function¶
The termThreads function can be used when the node
is multi-threaded. This function is called when all
the term functions of the threads have been called.
It could be used to close objects created in the
initThreads function or write final results to
an output queue.
Testing Computational Nodes¶
To make it easy to test and develop computational nodes:
- Create the queue definition for the inputs and outputs using the vor create queue command.
- Create a sample set of JSON data using the vor generate <table> --json command.
This will allow the node to be run independently of the other nodes in a process. In a playpen, navigate to the src directory. Run this node with the following steps:
export GOPATH=$VOR_STREAM_PLAY
export PATH=$PATH:<playpen>/bin
go install ./nodes/usernode
usernode -t
By default, the node looks in the playpen/test directory for the input file. If -input is not specified, the node will expect to find data with file name <queue name>.csv in the directory playpen/test.
Additional built-in flags to all computational nodes:
| Option | Argument | Description |
|---|---|---|
| -input | path to test file | Specify an input file other than the default. The input file should be in the same JSON format as the generated test file. Do not forget the termination record (provided automatically with vor generate). By default, the input will be obtained from <node name>.json in the <playpen>/test/<node-name>/ directory. The input file will do look-through at first the provided file path, then the <playpen>/test directory and will append the ".json" suffix to the file if it isn't specified |
| -output | <queue name>,<path to output file> … | Specify the output files. A node can produce multiple output queues, so the specification is a pair of the queue name followed by the path to the output file for that queue. |
| -options | Configuration file | Specify overrides to the configuration file in the input directory of the playpen. |
| -t | n/a | Run node in test mode |
Merging input queues¶
Typically, a computational node takes in a single input queue and creates one or more output queues. Sometimes it is desired to read in multiple queues and process them generically. The idea is that the input queues have common fields that can be used to derive the output. Specify a set of queues that are to be merged by specifying them as a space separated list as the first entry in the input parentheses.
For example, the following specification:
node mergedin(queue1 queue2 queue3)(out)
creates a node that has three merged queues as input, queue1,
queue2, and queue3. The generated template for mergedin
, by default, will read from the three queues and output the
matching fields to out. The order of the observations in the
output is indeterminant. The code in the worker function
does not have to be thread-safe unless more than one thread
is requested for the node.
The generated template for mergedin
has two differences from a node without merged input:
- The input variable for the worker function will have
the
anytype rather than the type for an input queue. - An example switch statement will be provided that could be used to determine which queue an observation came from.
- The fields in input structure are not accessable until the correct input structure is declared.
The following is an example switch statement:
func sample(input any) {
switch in := input.(type) {
case *Types.Types:
_ = in // in is now type *Types.Types
case *Typesin.Typesin:
_ = in // in is now type *Typesin.Typesin
case *Fanout.Fanout:
_ = in // in is now type *Fanout.Fanout
}
}
Inside each case statement, the variable in
is automatically cast to a structure of the
correct type.
Here is an example use case:
The worker function is going to multiply the input field Num times 0.1
and assign it to the output field X. If a single input queue Types was
used this would simply be:
u.Typesout.X = input.Num * 0.1
If instead, three queues were used (Types, Typesin, and Fanout),
this operation would look like the following:
func sample(input any) {
switch in := input.(type) {
case *Types.Types:
u.Typesout.X = in.Num * 0.1 // in is now type *Types.Types
case *Typesin.Typesin:
u.Typesout.X = in.Num * 0.1 // in is now type *Typesin.Typesin
case *Fanout.Fanout:
u.Typesout.X = in.Num * 0.1 // in is now type *Fanout.Fanout
}
}
This code assumes that all three queues had the field named Num in them.
Multiple Input Queues¶
Computational nodes can accept multiple input queues.
These queues are read separately and are not merged together.
The first queue, or queues in the merged case, are treated as
the main input queue. The main input queue is what the node
processes one observation at a time. All remaining input
queues are read, in parallel, into arrays. These arrays are
available before the _init() function is called.
Example¶
node multiIn(merge1 merge2, addQueue1, addQueue2)(typesout)
In this example, the node multiIn will get one main queue that the merger of
merge1 and merge2. Additionally, two arrays will be created
named addqueue1 and addqueue2. Each array will consist
of structures with fields and types matching the
queues fields and types.
Important Notes About Multiple Input Queues¶
- The additional queues will be contained entirely in memory. The largest queue should be the main queue.
- A single copy of the additional queue is shared between the threads.
- The arrays containing the additional queues are thread-safe for reads, allowing multiple threads to read these arrays simultaneously.
- The where= clause is ignored for the additional queues.
- GroupKey settings on additional queues are ignored.
Node Statistics¶
Every node, by default, will count the total number of observations it processes
from an input queue. This is used by the UI to display progress and for general
information for monitoring the running process. For computational nodes,
additional statistics can be created and output. Here is the format of the
ComputeStat() function:
u.ComputeStat("stat", "<Label>", value)
Where “stat” pertains to one of the following:
- Count – Tracks the number of times this statistic function is called.
- Mean – Computes the mean of the provided values.
- Min – Determines the minimum of the values provided.
- Max – Determines the maximum of the values provided.
- TopN – Determines the top 10 of the values provided.
- BottomN – Determines the 10 smallest values provided.
The <Label> is a user-provided string that is displayed in the UI and run log. Value is the variable or number for which the statistic is computed. For all statistics besides count, a value must be provided to reflect the value of the specific field of the input, or a computed value. Index is a positive sequential integer indicating which statistic is being updated. Up to 50 statistics may be specified per node. All statistics are computed as an aggregate of all threads.
Some examples:
u.ComputeStat( "mean", "value", input.Value)
u.ComputeStat( "min", "value", input.Value)
u.ComputeStat( "max", "LoanAmt", input.LoanAmt)
u.ComputeStat( "topN", "otherValue", input.Other)
u.ComputeStat( "bottomN", "newValue", input.New)
Using Matrix Maps¶
Matrix maps are used for data not ideal for processing one observation at a time. For more details on the structure and use of matrix maps, please see the Data Sources - Matrix Maps section.
To access this data in a computational node, add the
following code to the init function of the user code:
u.mat = u.GetMatrixMap("exchangerates")
and add this declaration to the user structure:
mat map[string]map[string]float64
If this node is multi-threaded, the matrix map will only be read once for all threads and there will be only one copy of this data in memory.
To access this matrix map in the worker() function use:
usd_eur_fx := u.mat["USD"]["EUR"]
Standard Golang syntax for looping over this matrix is:
for row, rowValue := range u.mat {
for col, value := range rowValue {
log.Println(row, col, value )
}
}
Using Scenarios¶
Such as with matrix maps, scenario data is not ideal for a single-observation pass of data. For more details on the structure and use of scenarios, please see the Data Sources - Scenarios section.
Such as with matrix maps, scenarios are loaded only once and shared across
threads. The scenarios need to be in the input/scenario directory
or uploaded through the UI. To load scenario
data in a node, put the scenario name in the joboptions.json file,
"scenario::"\<scenario name>, and use u.GetScenarios() function in U.go file.
This function does not have arguments as it gets the name of the scenario to load
from the joboptions.json file.
The u.GetScenarios() function returns two arguments - three-dimensional
structure (scen) and an error value:
import sdk
var scenarios []*sdk.VORScen
var err error
scenarios, err = u.GetScenarios()
.
The first dimension of scenarios is the scenario. The second dimension
is the dates. The third is map of values for the scenario and date. In
this case, it is a map of the scenario names.
for _, scen:= range scenarios {
scenarioName := scen.Name
for _, d := range scen.Dates {
log.Println(d.Date)
for name, value := range d.Scen {
log.Println(name, value)
}
}
}
If you don't have equal numbers of horizons in each scenario, it is the user's responsibility to check for values at each horizon.
Using Simulations¶
Simulations are like scenarios but are usually much more numerous, and each simulation draw is unnamed. For more details on the structure and uses of simulations, please see the Data Sources - Simulations section.
Like matrix maps, simulations are loaded only once and shared across threads. To
load simulation data in a node use the u.GetSims() function. This function
does not have arguments as it gets the name of the scenario to load from the run
options.
The u.GetSims() function returns two components:
var varMap map[string]int
var sims [][][]any
varMap, sims = u.GetSims()
Sims, in this case, is a multi-dimensional array. The first dimension is the draw number starting at 0. The second dimension is the horizon index starting at zero. The following code loops over a set of simulations and prints their values:
for simNum, simValue := range sims {
for horizonIndex, values := range simValue {
log.Println(horizonIndex, values[varMap["date"]].(date.Date),
values varMap["num1"]].(float64) )
}
}
Setting and Waiting on Signals¶
Signals are used to synchronize activities between nodes. If a node, A, needs
to wait for another node, B, to finish some task before doing its own
task, A can wait on a signal from B. Node A would use
u.WaitSignals("<signal_name1>", "<signal_name2>" ...) to wait on a signal
from B. You must use the getsig= on the A node.
A user node can send signals of its own to other nodes. These signals do not
have to be declared by the producing node or be consumed by any node. Care must
made in choosing a signal name as it should not correspond with any existing
node names or signal names. A signal can be sent
using the u.SendSignal() function. Usage:
u.SendSignal("godot")
When executed in Test Mode (running a node individually with the -t flag),
u.WaitSignals() will return without waiting. When the node run as part of a
process, u.WaitSignals() will block the current thread execution until all the
signals in the list are received.
Setting and Getting Dynamic Facts¶
Signals are a simple way to communicate between nodes. To pass more information between nodes, the concept of dynamic facts can be used. One node can Set a fact and other nodes can then Get that fact. Facts themselves can be any Golang type including maps and arrays. There are some basics types that are supported:
- map[string]float64
- []float64
- []map[string]any
- map[string]map[string]any
- map[string][]map[string]any
- float64
- int
- int64
- string
All other types, will be returned as a map[string]any or just any.
Structures can be used but are
discouraged as a map is returned from the Get function
instead of a structure.
Use the u.DynFactGobSet function if preservation of the
structure is important
use of facts that require large amounts of memory should be avoided, as the
facts are sent back and forth via the mid-tier.
A fact is associated with a name. The name is case-insensitive.
To set a fact use the u.DynFactSet() function:
fact := []string {"john","paul","ringo","george"}
u.DynFactSet("Beatles", fact)
In this example, a single fact named "Beatles" is set. This fact is an array of strings. Setting a dynamic fact can done at any time. Use the setdyn=/setfact= options on a node in the strm file to inform the system of the source of the facts. The receiving node waits automatically using wait signals on the user's behalf.
fact := map[string]any {
"dyn_fact1": 500,
"dyn_software": "StreamR is great",
"dyn_duo": []float64{5.3, 6.2, 125.6},}
u.DynFactSet(fact)
In this example, three facts were set at once. One fact was named dyn_fact1 and it was numeric, and another was named dyn_software and it was a string. The final fact is a double precision array. These could be set separately using the first form of the DynFactSet() function.
The u.DynFactSet() function sends a signal that the dynamic fact is ready to be
consumed. The node that is consuming the dynamic fact must declare the list of
dynamic facts it is expecting. This is done through the getdyn=/getfact=
option on a node in the strm file. If a node Vladimir is expecting to
use the above facts it would be declared as:
node Vladimir (in)(out) getdyn=beatles,dyn_fact1,dyn_software,dyn_duo
The u.DynFactGet() function is used to retrieve a fact. u.DynFactGet() will
return a nil if it can't find a requested fact - ie the fact was not listed
in the getdyn/getfact statement. To get a fact using the
u.DynFactGet() function:
f1 := u.DynFactGet("dyn_fact1")
if f1 != nil {
total := 100 + f1.(int)
}
or
var int f1
u.DynFactGet("dyn_fact1", &f1)
total := 100 + f1
The u.DynFactGet() will automatically wait for a signal that the fact is ready
before retrieving it from the mid-tier. Like parameter maps, each thread
shares one copy of the dynamic fact. The second form of the call to
u.DynFactGet() will error and terminate the job if the type of dyn_fact1
doesn't match the the second argument.
Dependencies can be created between nodes with dynamic facts that cannot be completed. This will cause the running process to hang. An example would be a dynamic fact that is generated at the end of a process, but nodes at the beginning depend on it. The node at the beginning will be waiting for the fact to be sent and the node at the end of the process may be waiting on those nodes to complete before it sends the fact.
Dynamic fact requests in test mode will always return nil.
Exchanging information with Golang Nodes¶
If facts are being sent only to other Golang nodes, you can use an alternate
routine, u.DynFactGobSet(), that preserves the Golang structure of the fact.
These facts are encoded in the GOB format of Golang.
Like any encoding in Golang, this routine need access to the fields in
the structure so the field name must be capitalized. Here is an example:
type complexStruct struct {
Num float64
IntT int
Char string
Boolean bool
}
// Add values to the struct
cS := complexStruct{
Num: 0.5,
IntT: -10,
Char: "This is a string",
Boolean: true,
}
u.DynFacGobSet("gob", cS)
To retrieve a fact stored as a GOB use the u.DynFactGobGet() routine.
The following example gets a structure from a dynamic fact:
type complexStruct struct {
Num float64
IntT int
Char string
Boolean bool
}
// Add values to the struct
cS := complexStruct{}
u.DynFacGobGet("gob", &cS)
Date Functions¶
DateAdd and DateDiff are two built-in date functions for Golang
computational nodes. These functions are in addition to the functions available
from the date package. DateAdd
and DateDiff are identical to the SQL node functions of the same name.
u.DateAdd(interval, n intervals, date) – Add n intervals to a date type and return a date.Date.
- n can be negative.
- Allowed intervals – year, quarter, month, week, day, weekday.
- The function supports an optional argument for date alignment:
- "begin", aligns to the beginning of the interval;
- "end", aligns to the end of the interval;
- "same", aligns to the exact amount specified by n.
Note: the weekday and week functions are based on weeks that start on Sunday and end on Saturday, where Monday-Friday are weekdays.
import "https://github.com/rickb777/date"
d := date.Parse("02Jan2006", "31DEC2019") // 12/31/2021 -> d
newdate := u.DateAdd(`quarter`, 3, d, `end`) // 09/30/2022 -> newdate
u.DateDiff(interval, date, date) – Computes the number of intervals between two dates. The number of intervals can be negative. The type of the return is a float64.
- Allowed intervals - year, quarter, month, week, day, weekday.
Note: the weekday and week functions are based on weeks that start on Sunday and end on Saturday, where Monday-Friday are weekdays.
d1 := date.Parse("02Jan2006", "28Jun2021") // 6/28/2021 -> d1
d2 := date.Today()
diff := u.DateDiff("weekday", d2, d1)
Evaluating Run Time Expressions in Golang¶
It is often desirable to evaluate expressions that are not known until run time. These could be expressions made dynamically or provided by the user at run time.
For Golang computational nodes you can use the following function to evaluate an expression stored in a string:
u.EvalExpr( variables, expr) -> any // either a bool, float64, or a string
where variables is a structure containing the input variables to the expression, expr. See Evaluating Run Time Expressions in SQL for more details on the expressions and functions supported.
The following example evaluates an expression using variables from the input queue:
express := "ln(d1) + 1"
ans := u.EvalExpr( input, express).(float64) // assert that it returns a double
The following example evaluates expression using variables from the input queue and some added variables.
type combo struct {
Coef1 float64
Coef2 float64
Intercept float64
Input *Testsql.Testsql
}
vars := combo{Coef1: 7.,
Coef2: 3.,
Intercept: 1.,
Input: input}
express := "coef1 * x**2 + coef2 * x + intercept"
ans, err := u.EvalExpr(vars, express).(float64) // assert that it returns a double
Note the case of the declared structure variables. Golang considers lowercase variables
as private, so if the variables had been declared lowercase in the structure
the EvalExpr() function would not be able to see the values.
Sending Email from Go nodes¶
See the SDK documentation on sending email.
Reading Excel (.xlsx ) files in Golang¶
See the SDK documentation on reading Excel files.
Reading CSV files in Golang¶
Streaming is for big data. That is data that is greater than 1k observations. For smaller data sets we have the matrix map capability but matrix maps are only for numeric fields and only have two dimensions. Generalized CSV file input helps to bridge this gap.
CSV files can be in the input directory, the look through directory, or be uploaded and added to a run. CSV files in the playpen and look through directories are considered fallbacks for CSV files that are uploaded.
CSV files are requested using the u.ReadCSV() function.
CSV files are first searched for in the upload list for the run
and then the local directory.
CSV files can be requested by type which implies that the csv file
will only be found in the upload list.
Syntax for Golang is:
columns := map[string]string { "name":"type", "name":"type", ...}
res, warn, err := u.ReadCSV("name | type", columns, [delimiter],[,keyName1 [,keyName2}, ...)
The delimiter defaults to ",". The other delimiters available are; " ", ";", "|", and "\t". The optional keyNames would be a list of fields in the CSV file that would be used to create a nested map of the data. The limit on the number of keys is three (3). The warn and err return values are similar to the values returned by u.GetExcel().
The column description is used to describe the types and field names that are expected in the file. For example:
columns := map[string]string { "date":"date",
"x":"num",
"*","char"}
specifies that the csv file should contain variables date and x and they are types date and number. Any other fields found in the CSV file will have the type char. Without the "*" field name, only the fields date and x would be returned. The returned warning string will contain warnings about missing fields and conversion errors. It is up to the user to report these or ignore.
The following demonstrates the types returned for different numbers of keys:
columns := map[string]string { "date":"date", "class1":"char", "class2":"char", "value1":"num"}
var res any
var warn string
var err error
res, warn, err = u.ReadCSV("fileName.csv", columns)
// ([]map[string] any)
noKeys := res.(csv0Keys)
// One key map[key] map[fieldName] -> value
res, warn, err = u.ReadCSV("fileName.csv", columns, "class2")
// map[any] map[string] any
oneKey := res.(csv1Keys)
// Two keys
res, warn, err = u.ReadCSV("fileName.csv", columns, "class2", "date")
// map[any]map[any]map[string]any
twoKeys := res.(csv2Keys)
// Three keys
res, warn, err = u.ReadCSV("fileName.csv", columns, "class2", "class1", "date")
// map[any]map[any]map[any]map[string]any
threeKeys := res.(csv3Keys)
// extract a value from each form
class1 := "Raleigh"
class2 := "North Carolina"
dt, _ := frgutil.ParseDate("07jun1997")
log.Println("5th observation in the CSV file", noKeys[4] )
log.Println("The observation keyed to class2=\"North Carolina\"", oneKey[class2] )
log.Println("The observation keyed to class2=\"North Carolina\" and date=\"07jun1997\"", twoKeys[class2][dt] )
log.Println("The observation keyed to class2=\"North Carolina\", class2=\"Raleigh\", and date=\"07jun1997\"", threeKeys[class2][class1][dt] )
log.Println(err,warn)
Note the 4 helper types used to declare the returned values: csv0Keys, csv1Keys, csv2Keys, and csv3Keys.
Database Interactions¶
For most use cases, it is recommended to use the database input and output nodes to interact with databases. With options for using custom SQL queries and post filtering capabilities using Golang template, these nodes provide a powerful and flexible way to work with databases in your processes. The database input and output nodes are designed to handle common database operations such as reading from and writing to databases without the need for extensive custom code. Refer to the Database Input and Output section for detailed instructions on setting up and using these nodes.
If your use case requires an implementation beyond what the database input and output nodes provide, you can establish a direct connection to the database within your computational node. Ideally, these actions are performed in the init or term block of the computational node. To get a connection to the installed database, use:
db, err := u.hh.ConnectPSQL( [vault path] )
if err != nil {
log.Errorln("cannot connect to the database:", err)
frgutil.EndJob(u.hh)
}
defer db.Close()
or
db, err := u.hh.ConnectMSSQL( [vault path] )
if err != nil {
log.Errorln("cannot connect to the database:", err)
frgutil.EndJob(u.hh)
}
defer db.Close()
Where <vault path> is an optional path to a key-value store containing the database connection details. If none is provided, the default vault path in the global or playpen configuration will be used. This approach allows for flexibility in connecting to various databases. For more information on setting up the vault path, refer to Setting up Database Connections.
Once connected, you can execute queries or insert statements as needed. Refer to the Golang SQL documentation for details on how to work with SQL databases in Golang.
Controlling Information Displayed in the Log¶
Use the import log "github.com/sirupsen/logrus" for log functions.
There are six levels of logging available listed in increasing amount of output:
- fatal - only fatal messages are printed. This corresponds to
log.Fatal()instances in the Golang code. - error - only fatal and error messages are printed. Error messages
correspond to
log.Error()instances in the Golang code andlogging.error()instances in Python. - warn - Additionally warning messages are printed. Warning messages
correspond to
log.Warning()instances in the Golang code andlogging.warning()instances in Python. - info - Additionally informational messages are printed. Informational
messages correspond to
log.Info()instances in the Golang code andlogging.info()instances in Python. - debug - Additionally debug messages are printed. Debug messages
correspond to
log.Debug()instances in the Golang code andlogging.debug()instances in Python. - trace - Additionally tracing messages are printed. Tracing messages
correspond to
log.Trace()instances in the Golang code.
The user controls what is displayed by using the LOG_LEVEL environmental variable and setting it to one of the listed values. For example,
export LOG_LEVEL=debug
The default LOG_LEVEL is info.