Skip to content

Computational Go

Introduction

If queues are the arteries of the stream system, then computational nodes are the lungs and beating heart.

Golang computational nodes result in fast, compiled binaries that can execute an array of different operations on your streaming data.

Creating the Node

The method of creating computational nodes is through Stream files.

When a computational node named <node name> is created, the following actions are performed:

  1. A directory is created named lowercase <node name> in the <playpen>/src/nodes directory.
  2. A main program file called <node name>.go is created in the <playpen>/src/nodes/<node name> directory.
  3. A user-modifiable program file called <node-name>U.go is created in the <playpen>/src/nodes/<node-name> directory. If this file already exists, it will not be overwritten unless the force option is used in the creation of the node.
  4. The node is registered to the mid-tier as being associated with the current playpen.

Customizations of the computational node should be done only on the *U.go file. Within this file, there are three main sections, or blocks: init, worker, and term. There are also two additional sections; initThreads and termThreads.

  • The init block is run once per thread before the worker block, and is where initializations should be placed.
  • The worker block is run for every observation in the input data, and follows a SAS DATA step-like process.
  • The term block is run at the end of the *U.go file, once per thread.
  • The initThreads function is run once before any threads
  • The termThreads function is run when all the threads complete.

💡 Tip: For more details on the structure of *U.go files, see the *U.go File Structure section.

No additional code is required in any of these functions. It is recommended that an IDE be used for editing and debugging this code.

Details on the init function

Typical operations performed in the init function are:

  • Initialization of user added fields in the User structure
  • Reading from static files
  • Processing in-memory queues
  • Sending or recieving dynamic facts

The User structure instance is unique to each thread so fields added to this structure are thread safe. The init function will be called once for each thread requested for the node. When the init function is called, the output queues are open and available for posting data.

Details on the worker function

Typical operations performed in the worker function are:

  • Processing of data from an input queue, one observation at a time
  • Executing a model
  • Posting new data to one or more output queues
  • Rewinding a queue that is using a group key

Details on the term function

Typical operations performed in the term function are:

  • Posting aggregate values
  • Sending dynamic facts

When the term function is called, the output queues are still open and available for posting data and all the input queues are empty (the data has been consumed).

Details on the initThreads function

The initThreads function can be used when the node is multi-threaded. When initThreads is called, in-memory queues have been created but no threads have been started and the output queues are not available for writing. Since the User structure is thread specific (one per thread), any global data created in initThreads should be stored in u.hh.User which is of type any. The u.hh is shared between threads. In Golang, reading from a structure is threadsafe so no synchronizations are required for reading. Writing to a structure shared across threads does require synchronization as does looping over a map. The initThreads function is called only once, so operations in this function don't need to be synchronized with other threads.

Details on the termThreads function

The termThreads function can be used when the node is multi-threaded. This function is called when all the term functions of the threads have been called. It could be used to close objects created in the initThreads function or write final results to an output queue.

Testing Computational Nodes

To make it easy to test and develop computational nodes:

  1. Create the queue definition for the inputs and outputs using the vor create queue command.
  2. Create a sample set of JSON data using the vor generate <table> --json command.

This will allow the node to be run independently of the other nodes in a process. In a playpen, navigate to the src directory. Run this node with the following steps:

export GOPATH=$VOR_STREAM_PLAY
export PATH=$PATH:<playpen>/bin
go install ./nodes/usernode
usernode -t

By default, the node looks in the playpen/test directory for the input file. If -input is not specified, the node will expect to find data with file name <queue name>.csv in the directory playpen/test.

Additional built-in flags to all computational nodes:

Option Argument Description
-input path to test file Specify an input file other than the default. The input file should be in the same JSON format as the generated test file. Do not forget the termination record (provided automatically with vor generate). By default, the input will be obtained from <node name>.json in the <playpen>/test/<node-name>/ directory. The input file will do look-through at first the provided file path, then the <playpen>/test directory and will append the ".json" suffix to the file if it isn't specified
-output <queue name>,<path to output file> … Specify the output files. A node can produce multiple output queues, so the specification is a pair of the queue name followed by the path to the output file for that queue.
-options Configuration file Specify overrides to the configuration file in the input directory of the playpen.
-t n/a Run node in test mode

Merging input queues

Typically, a computational node takes in a single input queue and creates one or more output queues. Sometimes it is desired to read in multiple queues and process them generically. The idea is that the input queues have common fields that can be used to derive the output. Specify a set of queues that are to be merged by specifying them as a space separated list as the first entry in the input parentheses.

For example, the following specification:

node mergedin(queue1 queue2 queue3)(out)

creates a node that has three merged queues as input, queue1, queue2, and queue3. The generated template for mergedin , by default, will read from the three queues and output the matching fields to out. The order of the observations in the output is indeterminant. The code in the worker function does not have to be thread-safe unless more than one thread is requested for the node.

The generated template for mergedin has two differences from a node without merged input:

  • The input variable for the worker function will have the any type rather than the type for an input queue.
  • An example switch statement will be provided that could be used to determine which queue an observation came from.
  • The fields in input structure are not accessable until the correct input structure is declared.

The following is an example switch statement:

func sample(input any) {
    switch in := input.(type) {

    case *Types.Types:
        _ = in // in is now type *Types.Types

    case *Typesin.Typesin:
        _ = in // in is now type *Typesin.Typesin

    case *Fanout.Fanout:
        _ = in // in is now type *Fanout.Fanout

    }
}

Inside each case statement, the variable in is automatically cast to a structure of the correct type.

Here is an example use case: The worker function is going to multiply the input field Num times 0.1 and assign it to the output field X. If a single input queue Types was used this would simply be:

u.Typesout.X = input.Num * 0.1

If instead, three queues were used (Types, Typesin, and Fanout), this operation would look like the following:

func sample(input any) {
    switch in := input.(type) {

    case *Types.Types:
        u.Typesout.X = in.Num * 0.1 // in is now type *Types.Types

    case *Typesin.Typesin:
        u.Typesout.X = in.Num * 0.1 // in is now type *Typesin.Typesin

    case *Fanout.Fanout:
        u.Typesout.X = in.Num * 0.1 // in is now type *Fanout.Fanout

    }
}

This code assumes that all three queues had the field named Num in them.

Multiple Input Queues

Computational nodes can accept multiple input queues. These queues are read separately and are not merged together. The first queue, or queues in the merged case, are treated as the main input queue. The main input queue is what the node processes one observation at a time. All remaining input queues are read, in parallel, into arrays. These arrays are available before the _init() function is called.

Example

node multiIn(merge1 merge2, addQueue1, addQueue2)(typesout)

In this example, the node multiIn will get one main queue that the merger of merge1 and merge2. Additionally, two arrays will be created named addqueue1 and addqueue2. Each array will consist of structures with fields and types matching the queues fields and types.

Important Notes About Multiple Input Queues

  • The additional queues will be contained entirely in memory. The largest queue should be the main queue.
  • A single copy of the additional queue is shared between the threads.
  • The arrays containing the additional queues are thread-safe for reads, allowing multiple threads to read these arrays simultaneously.
  • The where= clause is ignored for the additional queues.
  • GroupKey settings on additional queues are ignored.

Node Statistics

Every node, by default, will count the total number of observations it processes from an input queue. This is used by the UI to display progress and for general information for monitoring the running process. For computational nodes, additional statistics can be created and output. Here is the format of the ComputeStat() function:

u.ComputeStat("stat", "<Label>", value)

Where “stat” pertains to one of the following:

  1. Count – Tracks the number of times this statistic function is called.
  2. Mean – Computes the mean of the provided values.
  3. Min – Determines the minimum of the values provided.
  4. Max – Determines the maximum of the values provided.
  5. TopN – Determines the top 10 of the values provided.
  6. BottomN – Determines the 10 smallest values provided.

The <Label> is a user-provided string that is displayed in the UI and run log. Value is the variable or number for which the statistic is computed. For all statistics besides count, a value must be provided to reflect the value of the specific field of the input, or a computed value. Index is a positive sequential integer indicating which statistic is being updated. Up to 50 statistics may be specified per node. All statistics are computed as an aggregate of all threads.

Some examples:

u.ComputeStat( "mean", "value", input.Value)
u.ComputeStat( "min", "value", input.Value)
u.ComputeStat( "max", "LoanAmt", input.LoanAmt)
u.ComputeStat( "topN", "otherValue", input.Other)
u.ComputeStat( "bottomN", "newValue", input.New)

Using Matrix Maps

Matrix maps are used for data not ideal for processing one observation at a time. For more details on the structure and use of matrix maps, please see the Data Sources - Matrix Maps section.

To access this data in a computational node, add the following code to the init function of the user code:

u.mat = u.GetMatrixMap("exchangerates")

and add this declaration to the user structure:

mat map[string]map[string]float64

If this node is multi-threaded, the matrix map will only be read once for all threads and there will be only one copy of this data in memory.

To access this matrix map in the worker() function use:

usd_eur_fx := u.mat["USD"]["EUR"]

Standard Golang syntax for looping over this matrix is:

for row, rowValue := range u.mat {
    for col, value := range rowValue {
        log.Println(row, col, value )
    }
}

Using Scenarios

Such as with matrix maps, scenario data is not ideal for a single-observation pass of data. For more details on the structure and use of scenarios, please see the Data Sources - Scenarios section.

Such as with matrix maps, scenarios are loaded only once and shared across threads. The scenarios need to be in the input/scenario directory or uploaded through the UI. To load scenario data in a node, put the scenario name in the joboptions.json file, "scenario::"\<scenario name>, and use u.GetScenarios() function in U.go file. This function does not have arguments as it gets the name of the scenario to load from the joboptions.json file.

The u.GetScenarios() function returns two arguments - three-dimensional structure (scen) and an error value:

import sdk
var scenarios    []*sdk.VORScen
var err error
scenarios, err = u.GetScenarios()

.

The first dimension of scenarios is the scenario. The second dimension is the dates. The third is map of values for the scenario and date. In this case, it is a map of the scenario names.

for _, scen:= range scenarios {
   scenarioName := scen.Name
   for _, d := range scen.Dates {
      log.Println(d.Date)
      for name, value := range d.Scen {
         log.Println(name, value)
      }
    }
}

If you don't have equal numbers of horizons in each scenario, it is the user's responsibility to check for values at each horizon.

Using Simulations

Simulations are like scenarios but are usually much more numerous, and each simulation draw is unnamed. For more details on the structure and uses of simulations, please see the Data Sources - Simulations section.

Like matrix maps, simulations are loaded only once and shared across threads. To load simulation data in a node use the u.GetSims() function. This function does not have arguments as it gets the name of the scenario to load from the run options.

The u.GetSims() function returns two components:

var varMap map[string]int
var sims [][][]any
varMap, sims = u.GetSims()

Sims, in this case, is a multi-dimensional array. The first dimension is the draw number starting at 0. The second dimension is the horizon index starting at zero. The following code loops over a set of simulations and prints their values:

for simNum, simValue := range sims {
    for horizonIndex, values := range simValue {
        log.Println(horizonIndex, values[varMap["date"]].(date.Date),
                                        values varMap["num1"]].(float64) )
    }
}

Setting and Waiting on Signals

Signals are used to synchronize activities between nodes. If a node, A, needs to wait for another node, B, to finish some task before doing its own task, A can wait on a signal from B. Node A would use u.WaitSignals("<signal_name1>", "<signal_name2>" ...) to wait on a signal from B. You must use the getsig= on the A node.

A user node can send signals of its own to other nodes. These signals do not have to be declared by the producing node or be consumed by any node. Care must made in choosing a signal name as it should not correspond with any existing node names or signal names. A signal can be sent using the u.SendSignal() function. Usage:

u.SendSignal("godot")

When executed in Test Mode (running a node individually with the -t flag), u.WaitSignals() will return without waiting. When the node run as part of a process, u.WaitSignals() will block the current thread execution until all the signals in the list are received.

Setting and Getting Dynamic Facts

Signals are a simple way to communicate between nodes. To pass more information between nodes, the concept of dynamic facts can be used. One node can Set a fact and other nodes can then Get that fact. Facts themselves can be any Golang type including maps and arrays. There are some basics types that are supported:

  • map[string]float64
  • []float64
  • []map[string]any
  • map[string]map[string]any
  • map[string][]map[string]any
  • float64
  • int
  • int64
  • string

All other types, will be returned as a map[string]any or just any. Structures can be used but are discouraged as a map is returned from the Get function instead of a structure. Use the u.DynFactGobSet function if preservation of the structure is important use of facts that require large amounts of memory should be avoided, as the facts are sent back and forth via the mid-tier.

A fact is associated with a name. The name is case-insensitive.

To set a fact use the u.DynFactSet() function:

fact := []string {"john","paul","ringo","george"}
u.DynFactSet("Beatles", fact)

In this example, a single fact named "Beatles" is set. This fact is an array of strings. Setting a dynamic fact can done at any time. Use the setdyn=/setfact= options on a node in the strm file to inform the system of the source of the facts. The receiving node waits automatically using wait signals on the user's behalf.

fact := map[string]any {
      "dyn_fact1":   500,
      "dyn_software": "StreamR is great",
      "dyn_duo": []float64{5.3, 6.2, 125.6},}
u.DynFactSet(fact)

In this example, three facts were set at once. One fact was named dyn_fact1 and it was numeric, and another was named dyn_software and it was a string. The final fact is a double precision array. These could be set separately using the first form of the DynFactSet() function.

The u.DynFactSet() function sends a signal that the dynamic fact is ready to be consumed. The node that is consuming the dynamic fact must declare the list of dynamic facts it is expecting. This is done through the getdyn=/getfact= option on a node in the strm file. If a node Vladimir is expecting to use the above facts it would be declared as:

node Vladimir (in)(out) getdyn=beatles,dyn_fact1,dyn_software,dyn_duo

The u.DynFactGet() function is used to retrieve a fact. u.DynFactGet() will return a nil if it can't find a requested fact - ie the fact was not listed in the getdyn/getfact statement. To get a fact using the u.DynFactGet() function:

f1 := u.DynFactGet("dyn_fact1")
if f1 != nil {
    total := 100 + f1.(int)
}

or

var int f1
u.DynFactGet("dyn_fact1", &f1)

total := 100 + f1

The u.DynFactGet() will automatically wait for a signal that the fact is ready before retrieving it from the mid-tier. Like parameter maps, each thread shares one copy of the dynamic fact. The second form of the call to u.DynFactGet() will error and terminate the job if the type of dyn_fact1 doesn't match the the second argument.

Dependencies can be created between nodes with dynamic facts that cannot be completed. This will cause the running process to hang. An example would be a dynamic fact that is generated at the end of a process, but nodes at the beginning depend on it. The node at the beginning will be waiting for the fact to be sent and the node at the end of the process may be waiting on those nodes to complete before it sends the fact.

Dynamic fact requests in test mode will always return nil.

Exchanging information with Golang Nodes

If facts are being sent only to other Golang nodes, you can use an alternate routine, u.DynFactGobSet(), that preserves the Golang structure of the fact. These facts are encoded in the GOB format of Golang. Like any encoding in Golang, this routine need access to the fields in the structure so the field name must be capitalized. Here is an example:

type complexStruct struct {
    Num     float64
    IntT    int
    Char    string
    Boolean bool
}
// Add values to the struct
cS := complexStruct{
    Num:     0.5,
    IntT:    -10,
    Char:    "This is a string",
    Boolean: true,
}
u.DynFacGobSet("gob", cS)

To retrieve a fact stored as a GOB use the u.DynFactGobGet() routine. The following example gets a structure from a dynamic fact:

type complexStruct struct {
    Num     float64
    IntT    int
    Char    string
    Boolean bool
}
// Add values to the struct
cS := complexStruct{}
u.DynFacGobGet("gob", &cS)

Date Functions

DateAdd and DateDiff are two built-in date functions for Golang computational nodes. These functions are in addition to the functions available from the date package. DateAdd and DateDiff are identical to the SQL node functions of the same name.

u.DateAdd(interval, n intervals, date) – Add n intervals to a date type and return a date.Date.

  • n can be negative.
  • Allowed intervals – year, quarter, month, week, day, weekday.
  • The function supports an optional argument for date alignment:
    • "begin", aligns to the beginning of the interval;
    • "end", aligns to the end of the interval;
    • "same", aligns to the exact amount specified by n.

Note: the weekday and week functions are based on weeks that start on Sunday and end on Saturday, where Monday-Friday are weekdays.

import "https://github.com/rickb777/date"
d  := date.Parse("02Jan2006", "31DEC2019")  // 12/31/2021 -> d
newdate := u.DateAdd(`quarter`, 3, d, `end`) //   09/30/2022 -> newdate

u.DateDiff(interval, date, date) – Computes the number of intervals between two dates. The number of intervals can be negative. The type of the return is a float64.

  • Allowed intervals - year, quarter, month, week, day, weekday.

Note: the weekday and week functions are based on weeks that start on Sunday and end on Saturday, where Monday-Friday are weekdays.

d1  := date.Parse("02Jan2006", "28Jun2021")  //   6/28/2021 -> d1
d2 := date.Today()
diff := u.DateDiff("weekday", d2, d1)

Evaluating Run Time Expressions in Golang

It is often desirable to evaluate expressions that are not known until run time. These could be expressions made dynamically or provided by the user at run time.

For Golang computational nodes you can use the following function to evaluate an expression stored in a string:

u.EvalExpr( variables, expr) -> any // either a bool, float64, or a string

where variables is a structure containing the input variables to the expression, expr. See Evaluating Run Time Expressions in SQL for more details on the expressions and functions supported.

The following example evaluates an expression using variables from the input queue:

 express := "ln(d1) + 1"
 ans := u.EvalExpr( input, express).(float64) // assert that it returns a double

The following example evaluates expression using variables from the input queue and some added variables.

type combo struct {
   Coef1     float64
   Coef2     float64
   Intercept float64
   Input     *Testsql.Testsql
}
vars := combo{Coef1: 7.,
   Coef2:     3.,
   Intercept: 1.,
   Input:     input}
express := "coef1 * x**2  + coef2 * x + intercept"
ans, err := u.EvalExpr(vars, express).(float64) // assert that it returns a double

Note the case of the declared structure variables. Golang considers lowercase variables as private, so if the variables had been declared lowercase in the structure the EvalExpr() function would not be able to see the values.

Sending Email from Go nodes

See the SDK documentation on sending email.

Reading Excel (.xlsx ) files in Golang

See the SDK documentation on reading Excel files.

Reading CSV files in Golang

Streaming is for big data. That is data that is greater than 1k observations. For smaller data sets we have the matrix map capability but matrix maps are only for numeric fields and only have two dimensions. Generalized CSV file input helps to bridge this gap.

CSV files can be in the input directory, the look through directory, or be uploaded and added to a run. CSV files in the playpen and look through directories are considered fallbacks for CSV files that are uploaded.

CSV files are requested using the u.ReadCSV() function. CSV files are first searched for in the upload list for the run and then the local directory. CSV files can be requested by type which implies that the csv file will only be found in the upload list.

Syntax for Golang is:

columns := map[string]string { "name":"type", "name":"type", ...}
res, warn, err := u.ReadCSV("name | type", columns, [delimiter],[,keyName1 [,keyName2}, ...)

The delimiter defaults to ",". The other delimiters available are; " ", ";", "|", and "\t". The optional keyNames would be a list of fields in the CSV file that would be used to create a nested map of the data. The limit on the number of keys is three (3). The warn and err return values are similar to the values returned by u.GetExcel().

The column description is used to describe the types and field names that are expected in the file. For example:

columns := map[string]string { "date":"date",
        "x":"num",
        "*","char"}

specifies that the csv file should contain variables date and x and they are types date and number. Any other fields found in the CSV file will have the type char. Without the "*" field name, only the fields date and x would be returned. The returned warning string will contain warnings about missing fields and conversion errors. It is up to the user to report these or ignore.

The following demonstrates the types returned for different numbers of keys:

columns := map[string]string { "date":"date", "class1":"char", "class2":"char", "value1":"num"}
var res any
var warn string
var err error
res, warn, err = u.ReadCSV("fileName.csv", columns)
// ([]map[string] any)
noKeys := res.(csv0Keys)

// One key map[key] map[fieldName] -> value
res, warn, err = u.ReadCSV("fileName.csv", columns, "class2")
// map[any] map[string] any
oneKey := res.(csv1Keys)

// Two keys
res, warn, err = u.ReadCSV("fileName.csv", columns, "class2", "date")
// map[any]map[any]map[string]any
twoKeys := res.(csv2Keys)

// Three keys
res, warn, err = u.ReadCSV("fileName.csv", columns, "class2", "class1", "date")
// map[any]map[any]map[any]map[string]any
threeKeys := res.(csv3Keys)

// extract a value from each form
class1 := "Raleigh"
class2 := "North Carolina"
dt, _ := frgutil.ParseDate("07jun1997")

log.Println("5th observation in the CSV file", noKeys[4] )
log.Println("The observation keyed to class2=\"North Carolina\"", oneKey[class2] )
log.Println("The observation keyed to class2=\"North Carolina\" and date=\"07jun1997\"", twoKeys[class2][dt] )
log.Println("The observation keyed to class2=\"North Carolina\", class2=\"Raleigh\", and date=\"07jun1997\"", threeKeys[class2][class1][dt] )
log.Println(err,warn)

Note the 4 helper types used to declare the returned values: csv0Keys, csv1Keys, csv2Keys, and csv3Keys.

Database Interactions

For most use cases, it is recommended to use the database input and output nodes to interact with databases. With options for using custom SQL queries and post filtering capabilities using Golang template, these nodes provide a powerful and flexible way to work with databases in your processes. The database input and output nodes are designed to handle common database operations such as reading from and writing to databases without the need for extensive custom code. Refer to the Database Input and Output section for detailed instructions on setting up and using these nodes.

If your use case requires an implementation beyond what the database input and output nodes provide, you can establish a direct connection to the database within your computational node. Ideally, these actions are performed in the init or term block of the computational node. To get a connection to the installed database, use:

db, err := u.hh.ConnectPSQL( [vault path] )
if err != nil {
    log.Errorln("cannot connect to the database:", err)
    frgutil.EndJob(u.hh)
}
defer db.Close()

or

db, err := u.hh.ConnectMSSQL( [vault path] )
if err != nil {
    log.Errorln("cannot connect to the database:", err)
    frgutil.EndJob(u.hh)
}
defer db.Close()

Where <vault path> is an optional path to a key-value store containing the database connection details. If none is provided, the default vault path in the global or playpen configuration will be used. This approach allows for flexibility in connecting to various databases. For more information on setting up the vault path, refer to Setting up Database Connections.

Once connected, you can execute queries or insert statements as needed. Refer to the Golang SQL documentation for details on how to work with SQL databases in Golang.

Controlling Information Displayed in the Log

Use the import log "github.com/sirupsen/logrus" for log functions.

There are six levels of logging available listed in increasing amount of output:

  1. fatal - only fatal messages are printed. This corresponds to log.Fatal() instances in the Golang code.
  2. error - only fatal and error messages are printed. Error messages correspond to log.Error() instances in the Golang code and logging.error() instances in Python.
  3. warn - Additionally warning messages are printed. Warning messages correspond to log.Warning() instances in the Golang code and logging.warning() instances in Python.
  4. info - Additionally informational messages are printed. Informational messages correspond to log.Info() instances in the Golang code and logging.info() instances in Python.
  5. debug - Additionally debug messages are printed. Debug messages correspond to log.Debug() instances in the Golang code and logging.debug() instances in Python.
  6. trace - Additionally tracing messages are printed. Tracing messages correspond to log.Trace() instances in the Golang code.

The user controls what is displayed by using the LOG_LEVEL environmental variable and setting it to one of the listed values. For example,

export LOG_LEVEL=debug

The default LOG_LEVEL is info.