Skip to content

Computational Go

Introduction

If queues are the arteries of the stream system, then computational nodes are the lungs and beating heart.

Golang computational nodes result in fast, compiled binaries that can execute an array of different operations on your streaming data.

Creating the Node

The method of creating computational nodes is through Stream files.

When a computational node named <node name> is created, the following actions are performed:

  1. A directory is created named lowercase <node name> in the <playpen>/src/nodes directory.
  2. A main program file called <node name>.go is created in the <playpen>/src/nodes/<node name> directory.
  3. A user-modifiable program file called <node-name>U.go is created in the <playpen>/src/nodes/<node-name> directory. If this file already exists, it will not be overwritten unless the force option is used in the creation of the node.
  4. The node is registered to the mid-tier as being associated with the current playpen.

Customizations of the computational node should be done only on the *U.go file. Within this file, there are three main sections, or blocks: init, worker, and term.

  • The init block is run once per thread before the worker block, and is where initializations should be placed.
  • The worker block is run for every observation in the input data, and follows a SAS DATA step-like process.
  • The term block is run at the end of the *U.go file, once per thread.

πŸ’‘ Tip: For more details on the structure of *U.go files, see the *U.go File Structure section.

To test and debug the customizations, run the main program. It is recommended that an IDE be used for editing and debugging this code.

Testing Computational Nodes

To make it easy to test and develop computational nodes:

  1. Create the queue definition for the inputs and outputs using the vor create queue command.
  2. Create a sample set of JSON data using the vor generate <table> --json command.

This will allow the node to be run independently of the other nodes in a process. In a playpen, navigate to the src directory. Run this node with the following steps:

export GOPATH=$VOR_STREAM_PLAY
export PATH=$PATH:<playpen>/bin
go install ./nodes/usernode
usernode -t

By default, the node looks in the playpen/test directory for the input file. If -input is not specified, the node will expect to find data with file name <queue name>.csv in the directory playpen/test.

Additional built-in flags to all computational nodes:

Option Argument Description
-input path to test file Specify an input file other than the default. The input file should be in the same JSON format as the generated test file. Do not forget the termination record (provided automatically with vor generate). By default, the input will be obtained from <node name>.json in the <playpen>/test/<node-name>/ directory. The input file will do look-through at first the provided file path, then the <playpen>/test directory and will append the ".json" suffix to the file if it isn't specified
-output <queue name>,<path to output file> … Specify the output files. A node can produce multiple output queues, so the specification is a pair of the queue name followed by the path to the output file for that queue.
-options Configuration file Specify overrides to the configuration file in the input directory of the playpen.
-t n/a Run node in test mode

Merging input queues

Typically, a computational node takes in a single input queue and creates one or more output queues. Sometimes it is desired to read in multiple queues and process them generically. The idea is that the input queues have common fields that can be used to derive the output. Specify a set of queues that are to be merged by specifying them as a space separated list as the first entry in the input parentheses.

For example, the following specification:

node mergedin(queue1 queue2 queue3)(out)

creates a node that has three merged queues as input, queue1, queue2, and queue3. The generated template for mergedin , by default, will read from the three queues and output the matching fields to out. The order of the observations in the output is indeterminant. The code in the worker function does not have to be thread-safe unless more than one thread is requested for the node.

The generated template for mergedin has two differences from a node without merged input:

  • The input variable for the worker function will have the any type rather than the type for an input queue.
  • An example switch statement will be provided that could be used to determine which queue an observation came from.
  • The fields in input structure are not accessable until the correct input structure is declared.

The following is an example switch statement:

func sample(input any) {
    switch in := input.(type) {

    case *Types.Types:
        _ = in // in is now type *Types.Types

    case *Typesin.Typesin:
        _ = in // in is now type *Typesin.Typesin

    case *Fanout.Fanout:
        _ = in // in is now type *Fanout.Fanout

    }
}

Inside each case statement, the variable in is automatically cast to a structure of the correct type.

Here is an example use case: The worker function is going to multiply the input field Num times 0.1 and assign it to the output field X. If a single input queue Types was used this would simply be:

u.Typesout.X = input.Num * 0.1

If instead, three queues were used (Types, Typesin, and Fanout), this operation would look like the following:

func sample(input any) {
    switch in := input.(type) {

    case *Types.Types:
        u.Typesout.X = in.Num * 0.1 // in is now type *Types.Types

    case *Typesin.Typesin:
        u.Typesout.X = in.Num * 0.1 // in is now type *Typesin.Typesin

    case *Fanout.Fanout:
        u.Typesout.X = in.Num * 0.1 // in is now type *Fanout.Fanout

    }
}

This code assumes that all three queues had the field named Num in them.

Multiple Input Queues

Computational nodes can accept multiple input queues. These queues are read separately and are not merged together. The first queue, or queues in the merged case, are treated as the main input queue. The main input queue is what the node processes one observation at a time. All remaining input queues are read, in parallel, into arrays. These arrays are available before the _init() function is called.

Example

node multiIn(merge1 merge2, addQueue1, addQueue2)(typesout)

In this example, the node multiIn will get one main queue that the merger of merge1 and merge2. Additionally, two arrays will be created named addqueue1 and addqueue2. Each array will consist of structures with fields and types matching the queues fields and types.

Important Notes About Multiple Input Queues

  • The additional queues will be contained entirely in memory. The largest queue should be the main queue.
  • A single copy of the additional queue is shared between the threads.
  • The arrays containing the additional queues are thread-safe for reads, allowing multiple threads to read these arrays simultaneously.
  • The where= clause is ignored for the additional queues.
  • GroupKey settings on additional queues are ignored.

Node Statistics

Every node, by default, will count the total number of observations it processes from an input queue. This is used by the UI to display progress and for general information for monitoring the running process. For computational nodes, additional statistics can be created and output. Here is the format of the ComputeStat() function:

u.ComputeStat("stat", "<Label>", value)

Where β€œstat” pertains to one of the following:

  1. Count – Tracks the number of times this statistic function is called.
  2. Mean – Computes the mean of the provided values.
  3. Min – Determines the minimum of the values provided.
  4. Max – Determines the maximum of the values provided.
  5. TopN – Determines the top 10 of the values provided.
  6. BottomN – Determines the 10 smallest values provided.

The <Label> is a user-provided string that is displayed in the UI and run log. Value is the variable or number for which the statistic is computed. For all statistics besides count, a value must be provided to reflect the value of the specific field of the input, or a computed value. Index is a positive sequential integer indicating which statistic is being updated. Up to 50 statistics may be specified per node. All statistics are computed as an aggregate of all threads.

Some examples:

u.ComputeStat( "mean", "value", input.Value)
u.ComputeStat( "min", "value", input.Value)
u.ComputeStat( "max", "LoanAmt", input.LoanAmt)
u.ComputeStat( "topN", "otherValue", input.Other)
u.ComputeStat( "bottomN", "newValue", input.New)

Using Matrix Maps

Matrix maps are used for data not ideal for processing one observation at a time. For more details on the structure and use of matrix maps, please see the Data Sources - Matrix Maps section.

To access this data in a computational node, add the following code to the init function of the user code:

u.mat = u.GetMatrixMap("exchangerates")

and add this declaration to the user structure:

mat map[string]map[string]float64

If this node is multi-threaded, the matrix map will only be read once for all threads and there will be only one copy of this data in memory.

To access this matrix map in the worker() function use:

usd_eur_fx := u.mat["USD"]["EUR"]

Standard Golang syntax for looping over this matrix is:

for row, rowValue := range u.mat {
    for col, value := range rowValue {
        log.Println(row, col, value )
    }
}

Using Scenarios

Such as with matrix maps, scenario data is not ideal for a single-observation pass of data. For more details on the structure and use of scenarios, please see the Data Sources - Scenarios section.

Such as with matrix maps, scenarios are loaded only once and shared across threads. The scenarios need to be in the input/scenario directory or uploaded through the UI. To load scenario data in a node, put the scenario name in the joboptions.json file, "scenario::"\<scenario name>, and use u.GetScenarios() function in U.go file. This function does not have arguments as it gets the name of the scenario to load from the joboptions.json file.

The u.GetScenarios() function returns two arguments - three-dimensional structure (scen) and an error value:

import sdk
var scenarios    []*sdk.VORScen
var err error
scenarios, err = u.GetScenarios()

.

The first dimension of scenarios is the scenario. The second dimension is the dates. The third is map of values for the scenario and date. In this case, it is a map of the scenario names.

for _, scen:= range scenarios {
   scenarioName := scen.Name
   for _, d := range scen.Dates {
      log.Println(d.Date)
      for name, value := range d.Scen {
         log.Println(name, value)
      }
    }
}

If you don't have equal numbers of horizons in each scenario, it is the user's responsibility to check for values at each horizon.

Using Simulations

Simulations are like scenarios but are usually much more numerous, and each simulation draw is unnamed. For more details on the structure and uses of simulations, please see the Data Sources - Simulations section.

Like matrix maps, simulations are loaded only once and shared across threads. To load simulation data in a node use the u.GetSims() function. This function does not have arguments as it gets the name of the scenario to load from the run options.

The u.GetSims() function returns two components:

var varMap map[string]int
var sims [][][]interface{}
varMap, sims = u.GetSims()

Sims, in this case, is a multi-dimensional array. The first dimension is the draw number starting at 0. The second dimension is the horizon index starting at zero. The following code loops over a set of simulations and prints their values:

for simNum, simValue := range sims {
    for horizonIndex, values := range simValue {
        log.Println(horizonIndex, values[varMap["date"]].(date.Date),
                                        values varMap["num1"]].(float64) )
    }
}

Setting and Waiting on Signals

Signals are used to synchronize activities between nodes. If a node, A, needs to wait for another node, B, to finish some task before doing its own task, A can wait on a signal from B. Node A would use u.WaitSignals("<signal_name1>", "<signal_name2>" ...) to wait on a signal from B. You must use the getsig= on the A node.

A user node can send signals of its own to other nodes. These signals do not have to be declared by the producing node or be consumed by any node. Care must made in choosing a signal name as it should not correspond with any existing node names or signal names. A signal can be sent using the u.SendSignal() function. Usage:

u.SendSignal("godot")

When executed in Test Mode (running a node individually with the -t flag), u.WaitSignals() will return without waiting. When the node run as part of a process, u.WaitSignals() will block the current thread execution until all the signals in the list are received.

Setting and Getting Dynamic Facts

Signals are a simple way to communicate between nodes. To pass more information between nodes, the concept of dynamic facts can be used. One node can Set a fact and other nodes can then Get that fact. Facts themselves can be any Golang type including maps and arrays. There are some basics types that are supported:

  • map[string]float64
  • []float64
  • []map[string]any
  • map[string]map[string]any
  • map[string][]map[string]any
  • float64
  • int
  • int64
  • string

All other types, will be returned as a map[string]any or just any. Structures can be used but are discouraged as a map is returned from the Get function instead of a structure. Use the u.DynFactGobSet function if preservation of the structure is important use of facts that require large amounts of memory should be avoided, as the facts are sent back and forth via the mid-tier.

A fact is associated with a name. The name is case-insensitive.

To set a fact use the u.DynFactSet() function:

fact := []string {"john","paul","ringo","george"}
u.DynFactSet("Beatles", fact)

In this example, a single fact named "Beatles" is set. This fact is an array of strings. Setting a dynamic fact can done at any time. Use the setdyn=/setfact= options on a node in the strm file to inform the system of the source of the facts. The receiving node waits automatically using wait signals on the user's behalf.

fact := map[string]interface{} {
      "dyn_fact1":   500,
      "dyn_software": "StreamR is great",
      "dyn_duo": []float64{5.3, 6.2, 125.6},}
u.DynFactSet(fact)

In this example, three facts were set at once. One fact was named dyn_fact1 and it was numeric, and another was named dyn_software and it was a string. The final fact is a double precision array. These could be set separately using the first form of the DynFactSet() function.

The u.DynFactSet() function sends a signal that the dynamic fact is ready to be consumed. The node that is consuming the dynamic fact must declare the list of dynamic facts it is expecting. This is done through the getdyn=/getfact= option on a node in the strm file. If a node Vladimir is expecting to use the above facts it would be declared as:

node Vladimir (in)(out) getdyn=beatles,dyn_fact1,dyn_software,dyn_duo

The u.DynFactGet() function is used to retrieve a fact. u.DynFactGet() will return a nil if it can't find a requested fact - ie the fact was not listed in the getdyn/getfact statement. To get a fact using the u.DynFactGet() function:

f1 := u.DynFactGet("dyn_fact1")
if f1 != nil {
    total := 100 + f1.(int)
}

The u.DynFactGet() will automatically wait for a signal that the fact is ready before retrieving it from the mid-tier. Like parameter maps, each thread shares one copy of the dynamic fact.

Dependencies can be created between nodes with dynamic facts that cannot be completed. This will cause the running process to hang. An example would be a dynamic fact that is generated at the end of a process, but nodes at the beginning depend on it. The node at the beginning will be waiting for the fact to be sent and the node at the end of the process may be waiting on those nodes to complete before it sends the fact.

Dynamic fact requests in test mode will always return nil.

Exchanging information with Golang Nodes

If facts are being sent only to other Golang nodes, you can use an alternate routine, u.DynFactGobSet(), that preserves the Golang structure of the fact. These facts are encoded in the GOB format of Golang. Like any encoding in Golang, this routine need access to the fields in the structure so the field name must be capitalized. Here is an example:

type complexStruct struct {
    Num     float64
    IntT    int
    Char    string
    Boolean bool
}
// Add values to the struct
cS := complexStruct{
    Num:     0.5,
    IntT:    -10,
    Char:    "This is a string",
    Boolean: true,
}
u.DynFacGobSet("gob", cS)

To retrieve a fact stored as a GOB use the u.DynFactGobGet() routine. The following example gets a structure from a dynamic fact:

type complexStruct struct {
    Num     float64
    IntT    int
    Char    string
    Boolean bool
}
// Add values to the struct
cS := complexStruct{}
u.DynFacGobGet("gob", &cS)

Date Functions

DateAdd and DateDiff are two built-in date functions for Golang computational nodes. These functions are in addition to the functions available from the date package. DateAdd and DateDiff are identical to the SQL node functions of the same name.

u.DateAdd(interval, n intervals, date) – Add n intervals to a date type and return a date.Date.

  • n can be negative.
  • Allowed intervals – year, quarter, month, week, day, weekday.
  • The function supports an optional argument for date alignment:
    • "begin", aligns to the beginning of the interval;
    • "end", aligns to the end of the interval;
    • "same", aligns to the exact amount specified by n.

Note: the weekday and week functions are based on weeks that start on Sunday and end on Saturday, where Monday-Friday are weekdays.

import "https://github.com/rickb777/date"
d  := date.Parse("02Jan2006", "31DEC2019")  // 12/31/2021 -> d
newdate := u.DateAdd(`quarter`, 3, d, `end`) //   09/30/2022 -> newdate

u.DateDiff(interval, date, date) – Computes the number of intervals between two dates. The number of intervals can be negative. The type of the return is a float64.

  • Allowed intervals - year, quarter, month, week, day, weekday.

Note: the weekday and week functions are based on weeks that start on Sunday and end on Saturday, where Monday-Friday are weekdays.

d1  := date.Parse("02Jan2006", "28Jun2021")  //   6/28/2021 -> d1
d2 := date.Today()
diff := u.DateDiff("weekday", d2, d1)

Evaluating Run Time Expressions in Golang

It is often desirable to evaluate expressions that are not known until run time. These could be expressions made dynamically or provided by the user at run time.

For Golang computational nodes you can use the following function to evaluate an expression stored in a string:

u.EvalExpr( variables, expr) -> interface{} // either a bool, float64, or a string

where variables is a structure containing the input variables to the expression, expr. See Evaluating Run Time Expressions in SQL for more details on the expressions and functions supported.

The following example evaluates an expression using variables from the input queue:

 express := "ln(d1) + 1"
 ans := u.EvalExpr( input, express).(float64) // assert that it returns a double

The following example evaluates expression using variables from the input queue and some added variables.

type combo struct {
   Coef1     float64
   Coef2     float64
   Intercept float64
   Input     *Testsql.Testsql
}
vars := combo{Coef1: 7.,
   Coef2:     3.,
   Intercept: 1.,
   Input:     input}
express := "coef1 * x**2  + coef2 * x + intercept"
ans, err := u.EvalExpr(vars, express).(float64) // assert that it returns a double

Note the case of the declared structure variables. Golang considers lowercase variables as private, so if the variables had been declared lowercase in the structure the EvalExpr() function would not be able to see the values.

Sending Email from Go nodes

See the SDK documentation on sending email.

Reading Excel (.xlsx ) files in Golang

See the SDK documentation on reading Excel files.

Reading CSV files in Golang

Streaming is for big data. That is data that is greater than 1k observations. For smaller data sets we have the matrix map capability but matrix maps are only for numeric fields and only have two dimensions. Generalized CSV file input helps to bridge this gap.

CSV files can be in the input directory, the look through directory, or be uploaded and added to a run. CSV files in the playpen and look through directories are considered fallbacks for CSV files that are uploaded.

CSV files are requested using the u.ReadCSV() function. CSV files are first searched for in the upload list for the run and then the local directory. CSV files can be requested by type which implies that the csv file will only be found in the upload list.

Syntax for Golang is:

columns := map[string]string { "name":"type", "name":"type", ...}
res, warn, err := u.ReadCSV("name | type", columns, [delimiter],[,keyName1 [,keyName2}, ...)

The delimiter defaults to ",". The other delimiters available are; " ", ";", "|", and "\t". The optional keyNames would be a list of fields in the CSV file that would be used to create a nested map of the data. The limit on the number of keys is three (3). The warn and err return values are similar to the values returned by u.GetExcel().

The column description is used to describe the types and field names that are expected in the file. For example:

columns := map[string]string { "date":"date",
        "x":"num",
        "*","char"}

specifies that the csv file should contain variables date and x and they are types date and number. Any other fields found in the CSV file will have the type char. Without the "*" field name, only the fields date and x would be returned. The returned warning string will contain warnings about missing fields and conversion errors. It is up to the user to report these or ignore.

The following demonstrates the types returned for different numbers of keys:

columns := map[string]string { "date":"date", "class1":"char", "class2":"char", "value1":"num"}
var res interface{}
var warn string
var err error
res, warn, err = u.ReadCSV("fileName.csv", columns)
// ([]map[string] interface{})
noKeys := res.(csv0Keys)

// One key map[key] map[fieldName] -> value
res, warn, err = u.ReadCSV("fileName.csv", columns, "class2")
// map[interface{}] map[string] interface{}
oneKey := res.(csv1Keys)

// Two keys
res, warn, err = u.ReadCSV("fileName.csv", columns, "class2", "date")
// map[interface{}]map[interface{}]map[string]interface{}
twoKeys := res.(csv2Keys)

// Three keys
res, warn, err = u.ReadCSV("fileName.csv", columns, "class2", "class1", "date")
// map[interface{}]map[interface{}]map[interface{}]map[string]interface{}
threeKeys := res.(csv3Keys)

// extract a value from each form
class1 := "Raleigh"
class2 := "North Carolina"
dt, _ := frgutil.ParseDate("07jun1997")

log.Println("5th observation in the CSV file", noKeys[4] )
log.Println("The observation keyed to class2=\"North Carolina\"", oneKey[class2] )
log.Println("The observation keyed to class2=\"North Carolina\" and date=\"07jun1997\"", twoKeys[class2][dt] )
log.Println("The observation keyed to class2=\"North Carolina\", class2=\"Raleigh\", and date=\"07jun1997\"", threeKeys[class2][class1][dt] )
log.Println(err,warn)

Note the 4 helper types used to declare the returned values: csv0Keys, csv1Keys, csv2Keys, and csv3Keys.

Database Interactions

A computational node can query or write to a database. Ideally these actions are performed in the initialization function or term block of the computational node. To get a connection to the installed database, use:

db := u.ConnectPsql(u.hh)
defer db.Close()

or

db := frgutil.ConnectMSSQL(u.hh)
defer db.Close()

If another database is needed, the connection will need to be made in the code. The following information describing the connection will need to be known:

  1. User name
  2. Password
  3. Port
  4. Host
  5. Database name
  6. SSL mode - optional

Then, use the connection to submit queries or insert statements. Refer to the Golang SQL documentation for details.

Controlling Information Displayed in the Log

Use the import log "github.com/sirupsen/logrus" for log functions.

There are six levels of logging available listed in increasing amount of output:

  1. fatal - only fatal messages are printed. This corresponds to log.Fatal() instances in the Golang code.
  2. error - only fatal and error messages are printed. Error messages correspond to log.Error() instances in the Golang code and logging.error() instances in Python.
  3. warn - Additionally warning messages are printed. Warning messages correspond to log.Warning() instances in the Golang code and logging.warning() instances in Python.
  4. info - Additionally informational messages are printed. Informational messages correspond to log.Info() instances in the Golang code and logging.info() instances in Python.
  5. debug - Additionally debug messages are printed. Debug messages correspond to log.Debug() instances in the Golang code and logging.debug() instances in Python.
  6. trace - Additionally tracing messages are printed. Tracing messages correspond to log.Trace() instances in the Golang code.

The user controls what is displayed by using the LOG_LEVEL environmental variable and setting it to one of the listed values. For example,

export LOG_LEVEL=debug

The default LOG_LEVEL is info.