Skip to content

Computational Python

Introduction

Python (version > 3.6) has been integrated with the VOR Stream platform to provide flexibility in the choice of programming languages.

The glue between the nodes are queues. Previously, code was generated for the Golang interfaces to queues. Similarly, code is generated for the Python interface to queues. Follow the same procedure for Golang queues and use the vor create queue command with the --lang python option to generate Python-compatible queues.

vor create queue --lang python

To create a Python computational node, use the --lang=python option on the node statement in a strm file:

node node1(input)(output) lang=python

Python has different module import requirements than Golang, so Python nodes live in a different directory in the playpen source tree. Specifically, Python code lives in the <playpen>/src/python directory. As with Golang nodes, two Python files are created for each computational node. The driver Python file is called <nodename>.py and the user-editable Python file is called <nodename>U.py. Unlike Golang computational nodes, however, Python computational nodes are not in separate directories.

User Code Basics

The following is an example of a user-editable Python file:

import math
import frgutil
from queues import Cashflow
from queues import Detailresults
from queues import Finalcashflow
import time

class genresults:
    def __init__(self, handle , Detailresults, Finalcashflow):
        self.Detailresults = Detailresults
        self.Finalcashflow = Finalcashflow

        # peform one time intializations if necessary
        self.hh = handle
        self.fx = handle.GetMatrixMap("exchangerates")
        print(self.fx)
   # worker is called for each observation sent on the input queue
    # input is a named tuple
    def worker(self, threadNum, input):
        self.outputDetailresults.Npv = 25
        self.outputDetailresults.Pnl = 5
        time.sleep(1)

        # You don't have to post to and/or all or output queues
        self.Detailresults.Post()
        self.Finalcashflow.Post()


        # update statistics
        ## self.hh.computeStat( "mean", "value", input.Value)
        ## self.hh.computeStat( "min", "value", input.Value)
        ## self.hh.computeStat( "max", "LoanAmt", input.LoanAmt)
        ## self.hh.computeStat( "topN", "otherValue", input.Other)
        ## self.hh.computeStat( "bottomN", "newValue", input.New)

    # called when done processing input
    def term(self):
        # Not required to do anything here
        return

The first part of the program is a typical import section of Python, in addition to any other imports your program will need. Remember to use the Python program, pip, to ensure that the module is installed. This example adds the time module for use in the worker method (described below).

The rest of the code is a Python class, named as the lower case of the node name. Here the _init_ method can be used, which is called when the class is instantiated. In this example, a matrix map is loaded in the _init_ method. Next is the worker method, where most of the code should be placed. Each output queue result is placed in a Python dictionary, which is created and named automatically. This code can be modified freely.

Also inserted are the commands for sending data to the output queues. These commands are in the form <Queue name>.Post(). In this example, a call to time.sleep() was made to simulate computations being performed.

Finally, a term method is called when the node completes processing. Additional code can be added here as necessary for cleanup.

Run a Node in Solo Mode

To run a Python node in solo mode, use the -t option:

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$STREAMRROOT/lib
python3 <playpen>/src/python/<node name>.py -t 

The remaining optional arguments are:

Option Argument Description
-input path to test file Specify an input file other than the default. The input file should be in the same JSON format as the generated test file. Do not forget the termination record (provided automatically with vor generate). By default, the input will be obtained from <name of node>.json in the <playpen>/test/<name of node>/ directory. The input file will perform a look-through at first the provided file path, then the <playpen>/test directory and will append the ".json" suffix to the file if it isn't specified
-output <queue name>,<path to output file> … Specify the output files. A node can produce multiple output queues, so the specification is a pair of the queue name followed by the path to the output file for that queue.
-options Configuration file Specify overrides to the configuration file in the input directory of the playpen.

Merging input queues

Typically, a computational node takes in a single input queue and creates one or more output queues. Sometimes it is desired to read in multiple queues and process them generically. The idea is that the input queues have common fields that can be used to derive the output. Specify a set of queues that are to be merged by specifying them as a space separated list as the first entry in the input parentheses.

For example, the following specification:

node mergedinpy(queue1 queue2 queue3)(out) lang=python

creates a python node that has three merged queues as input, queue1, queue2, and queue3. The generated template for mergedinpy , by default, will read from the three queues and output the matching fields to out. The order of the observations in the output is indeterminant. The code in the worker function does not have to be thread-safe unless more than one thread is requested for the node.

The generated template for mergedinpy has one difference from a node without merged input; the input variable for the worker function type will change depending on the queue the data was sourced.

If different actions need to be performed depending on the input queue, then use the Python match statement to determine which queue the data was sourced. The following is an example match statement:

def sample(input any):
    match input :

        case Types.Types():
             # reading from the types queue

        case Typesin.Typesin():
              # reading from the typesin queue

        case Fanout.Fanout():
            # reading from the types queue

Inside each case statement, the variable input is of the specified type.

Multiple Input Queues

Computational nodes can accept multiple input queues. These queues are read separately and are not merged together. The first queue, or queues in the merged case, are treated as the main input queue. The main input queue is what the node processes one observation at a time. All remaining input queues are read into pandas data frames before the __init__ function is called. Copies of these data frames are passed to each thread the python nodes uses.

Example

node multiInPy(merge1 merge2, addQueue1, addQueue2)(typesout) lang=python

In this example, the node multiInPy will get one main queue that the merger of merge1 and merge2. Additionally, two data frames will be created named addqueue1_df and addqueue2_df.

Important Notes About Multiple Input Queues in Python

  • The additional queues will be contained entirely in memory. The largest queue should be the main queue.
  • A copies of the additional queues made for each threads.
  • The arrays containing the additional queues are thread-safe for reads and writes.
  • GroupKey settings on additional queues are ignored.

Python Threads

Python is not a thread-safe language, so threading is done by starting multiple Python processes. Threading is done in VOR Stream using the multiprocessing package. This implies sharing objects between processes is difficult. Objects like matrix maps and scenarios are duplicated for each Python thread.

Using Matrix Maps

The code example in the previous section had an example of loading and printing a matrix map. Matrix maps are represented in Python as a dictionary of dictionaries. The outermost dictionary is keyed off of the row keys of the matrix map. The innermost dictionaries are keyed off of the column keys of the matrix map. For more details on the structure and use of matrix maps, please see the Data Sources - Matrix Maps section. To access a specific row and column of the matrix use:

matrix["row key"]["col key"]

In the example above, that would be:

fx = self.fx["USD"]["EUR"]

It is good programming practice to check for existence of keys before requesting a value to avoid premature program termination:

if "USD" in self.fx:
    if "EUR" in self.fx["USD"]:
        fx = self.fx["USD"]["EUR"]

Here is an example of what the full *U.py file might look like:

from queues import Output
from queues import Input
from frgutil import frgutil
import datetime
import math

class py_mm_usernode:
    options = dict()

    def __init__(self, handle, Output):
        self.Output = Output

        if 'processoptions' in handle.options['JobOptions']:
            self.options = handle.options['JobOptions']['processoptions']
        else:
            self.options = None
        # perform one time initializations if necessary
        self.hh = handle
        self.fx = handle.GetMatrixMap("exchangerates")

    # worker is called for each observation sent on the input queue
    # input is a named tuple
    def worker(self, threadNum, input):

        if "USD" in self.fx:
            if "EUR" in self.fx["USD"]:
                fx = self.fx["USD"]["EUR"]

        self.Output.Value2 = input.Value*fx

        # You don't have to post to and/or all or output queues
        self.Output.Post()

        # update statistics
        ## self.hh.computeStat( "mean", "value", input.Value)
        ## self.hh.computeStat( "min", "value", input.Value)
        ## self.hh.computeStat( "max", "LoanAmt", input.LoanAmt)
        ## self.hh.computeStat( "topN", "otherValue", input.Other)
        ## self.hh.computeStat( "bottomN", "newValue", input.New)

    def term(self):
        # Not required to do anything here
        return

Node Statistics

Every node, by default, computes the total number of observations it processes from an input queue. This is used by the UI to display progress and for general process moderating information. For Python computational nodes, additional statistics can be created and output. Here is the format of the ComputeStat() function for Python:

self.hh.computeStat("stat", "<Label>", value)

Where the available statistics are:

  1. Count – Tracks the number of times this statistic function is called.
  2. Mean – Computes the mean of the provided values.
  3. Min – Determines the minimum of the values provided.
  4. Max – Determines the maximum of the values provided.
  5. TopN – Determines the top 10 of the values provided.
  6. BottomN – Determines the bottom 10 of the values provided.

Using Scenarios and Simulations

Scenario usage for Python follows the same pattern as scenarios for Golang with the following differences:

  1. Multiple copies of scenarios are loaded for Python threads. See Python Threads for more explanation.
  2. In Python you call the routine scenario.get_scenarios(runID, scenarioSetName).
  3. get_scenarios() returns a dictionary of scenarios. Each scenario is a list of dictionaries, one for each date. The dictionary at each date is a name, value pair of the and values of the scenario.

An example of processing a scenario in Python is the following:

from sdk import scenario
import logging

try :
    scenarioSetName = handle.options["JobOptions"]["system"]["scenario"]
except KeyError :
    scenarioSetName  = ""
try :
    runID = handle.options["RunID"]
except (TypeError,KeyError) :
    runID  = 0
scen = scenario.GetScenarios(runID, scenarioSetName)
logging.info("There are %d scenarios", len(scen))

## loop over scenarios
for scenarioName, s in scen.items():
    # loop over horizons
    logging.info(scenarioName)
    for horizonNumber, horiz in enumerate(s):
        for name, value in horiz.items():
            logging.info("%s = %s", name, value)

Using Simulations

Simulation usage for Python follows the same pattern as simulations for Golang with the following differences:

  1. Multiple copies of simulations are loaded for Python threads. See Python Threads section for more explanation.
  2. In Python the routine self.hh.GetSims() is called.
  3. GetSims() returns dict[<variable name>][<value index>], and a three-dimensional list [<simNum>] [<date horizon>][<values>].

Using Process Options in a Python Computational Node

Process options are available at run time in the user structure of the computational node. The structure entry is called options and has the type dict().

To access the value of a specific option:

value = self.options["<category>"]["<sub-category>"]["<name>"]

It is good coding practice to check the existence of a key in a dictionary before dereferencing. Here is an example from the Python initialization block :

if 'processoptions' in handle.options['JobOptions']:
            self.options = handle.options['JobOptions']['processoptions']
        else:
            self.options = None

Sending and Waiting on Signals

If there is non-queue-based input required from another node, then using a wait signal will be necessary to avoid missing information. An example of non-queue-based input is a CSV file or a database table that is updated as part of the process. All output nodes produce signals when they are done writing. The signal sent is the name of the output node.

To wait on a signal in a computational node, the signal must be declared at node creation time with the -getsig=<signal name> option on the node statement in a .strm file. For example, to wait on the signal godot in the node life use:

node life(input)(output) getsig=godot

To wait on multiple signals, add them as a list of comma-separated signal names. This syntax only implies that this node could wait on these signals but does not require that it in fact waits. In order to actually wait on a signal, a call to self.hh.WaitSignals() must be added to the user node code. Example call for the above example is:

self.WaitSignals( ["godot"])

When executed in test mode, WaitSignals() will return without waiting. When the node is part of a process, WaitSignals() will block the current thread execution until all the signals in the list are received.

A user node can send signals of its own to other nodes. These signals do not have to be declared by the producing node or be consumed by any node. Care must made in choosing a signal name as it should not correspond any existing node names. A signal can be sent using the SendSignal() function. Usage:

self.hh.SendSignal("godot")

SendSignal() checks to see if the node waiting on the signal is ready to receive the signal before the signal is sent. SendSignal() will wait as long as 20 seconds for the receiving node to start accepting signals.

Setting and Getting Dynamic Facts

Setting and getting dynamic facts for Python follows the same pattern as for Golang with the following differences:

  1. In Python to set a fact use the routine self.hh.DynFactSet(). The argument is a dictionary containing the fact or facts that are to be shared.
  2. In Python to get a fact use the routine self.hh.DynFactGet(<name>). The argument is a string containing the name of the desired fact. The self.hh.DynFactGet() returns a dictionary or None.

You must declare the name of the facts being requested by the node with the getdyn= option when creating the node. Similarly, declare the facts created by a node using the setdyn= option.

See additional examples in Functions for Signals and Dynamic Facts

Sending Email from Python nodes

See the SDK documentation on sending email.

Reading Excel (.xlsx) files in Python

See the SDK documentation on reading Excel files.

Reading CSV (.csv) files in Python

Streaming is for big data. That is data that is greater than 1k observations. For smaller data sets we have the matrix map capability but matrix maps are only for numeric fields and only have two dimensions. Generalized CSV file input helps to bridge this gap.

CSV files can be in the input directory, the look through directory, or be uploaded and added to a run. CSV files in the playpen and look through directories are considered overrides for CSV files that are uploaded.

CSV files are requested using the sdk.csv.ReadCSV() function. CSV files are first searched for in the local directory and then the upload list for the run. CSV files can be requested by type which implies that the csv file will only be found in the upload list.

For python, there isn't a helper function getting the runID for the user so the syntax is:

from sdk import csv

cols = {"class1": "char", "class2": "char", "value": "num"}
## a runID of 0 implies no upload files to search
csv2, warn, err = csv.ReadCSV("input.csv", cols, 0, "class2", "class1" [, delim=','])
print(csv2, warn, err, csv2["North Carolina"]["Raleigh"])

The optional keyNames would be a list of fields in the CSV file that would be used to create a nested map of the data. The limit on the number of keys is three (3). The delimiter defaults to ",". The other delimiters available are; " ", ";", "|", and "\t".

The column description is used to describe the types and field names that are expected in the file. For example:

columns = { "date":"date",
        "x":"num",
        "*","char"}

specifies that the csv file should contain variables date and x and they are types date and number. Any other fields found in the CSV file will have the type char. Without the "*" field name, only the fields date and x would be returned. The returned warning string will contain warnings about missing fields and conversion errors. It is up to the user to report these or ignore.

Precompiled Libraries

Python allows users to employ compiled libraries that can be imported into computational nodes. To use these libraries at run time, the shared object files (*.so files) must be placed in the playpen lib directory. The lib directory can contain the actual shared object or a link to the shared object.