Computational Python
Introduction¶
Python (version > 3.6) has been integrated with the VOR Stream platform to provide flexibility in the choice of programming languages.
The glue between the nodes are queues. Previously, code was generated for the Golang interfaces to queues. Similarly, code is generated for the Python interface to queues. Follow the same procedure for Golang queues and use the vor create queue command with the --lang python option to generate Python-compatible queues.
vor create queue --lang python
To create a Python computational node, use the --lang=python option on the node statement in a strm file:
node node1(input)(output) lang=python
Python has different module import requirements than Golang, so Python nodes live in a different directory in the playpen source tree. Specifically, Python code lives in the <playpen>/src/python directory. As with Golang nodes, two Python files are created for each computational node. The driver Python file is called <nodename>.py and the user-editable Python file is called <nodename>U.py. Unlike Golang computational nodes, however, Python computational nodes are not in separate directories.
User Code Basics¶
The following is an example of a user-editable Python file:
import math
import frgutil
from queues import Cashflow
from queues import Detailresults
from queues import Finalcashflow
import time
class genresults:
def __init__(self, handle , Detailresults, Finalcashflow):
self.Detailresults = Detailresults
self.Finalcashflow = Finalcashflow
# peform one time intializations if necessary
self.hh = handle
self.fx = handle.GetMatrixMap("exchangerates")
print(self.fx)
# worker is called for each observation sent on the input queue
# input is a named tuple
def worker(self, threadNum, input):
self.outputDetailresults.Npv = 25
self.outputDetailresults.Pnl = 5
time.sleep(1)
# You don't have to post to and/or all or output queues
self.Detailresults.Post()
self.Finalcashflow.Post()
# update statistics
## self.hh.computeStat( "mean", "value", input.Value)
## self.hh.computeStat( "min", "value", input.Value)
## self.hh.computeStat( "max", "LoanAmt", input.LoanAmt)
## self.hh.computeStat( "topN", "otherValue", input.Other)
## self.hh.computeStat( "bottomN", "newValue", input.New)
# called when done processing input
def term(self):
# Not required to do anything here
return
The first part of the program is a typical import section of Python, in addition
to any other imports your program will need. Remember to use the Python program,
pip
, to ensure that the module is installed. This example adds the time module
for use in the worker
method (described below).
The rest of the code is a Python class, named as the lower case of the node
name. Here the _init_
method can be used, which is called when the class is
instantiated. In this example, a matrix map is loaded in the _init_
method.
Next is the worker method, where most of the code should be placed. Each output
queue result is placed in a Python dictionary, which is created and named
automatically. This code can be modified freely.
Also inserted are the commands for sending data to the output queues. These
commands are in the form <Queue name>.Post().
In this example, a call to time.sleep()
was made to simulate
computations being performed.
Finally, a term method is called when the node completes processing. Additional code can be added here as necessary for cleanup.
Run a Node in Solo Mode¶
To run a Python node in solo mode, use the -t option:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$STREAMRROOT/lib
python3 <playpen>/src/python/<node name>.py -t …
The remaining optional arguments are:
Option | Argument | Description |
---|---|---|
-input | path to test file | Specify an input file other than the default. The input file should be in the same JSON format as the generated test file. Do not forget the termination record (provided automatically with vor generate). By default, the input will be obtained from <name of node>.json in the <playpen>/test/<name of node>/ directory. The input file will perform a look-through at first the provided file path, then the <playpen>/test directory and will append the ".json" suffix to the file if it isn't specified |
-output | <queue name>,<path to output file> … | Specify the output files. A node can produce multiple output queues, so the specification is a pair of the queue name followed by the path to the output file for that queue. |
-options | Configuration file | Specify overrides to the configuration file in the input directory of the playpen. |
Merging input queues¶
Typically, a computational node takes in a single input queue and creates one or more output queues. Sometimes it is desired to read in multiple queues and process them generically. The idea is that the input queues have common fields that can be used to derive the output. Specify a set of queues that are to be merged by specifying them as a space separated list as the first entry in the input parentheses.
For example, the following specification:
node mergedinpy(queue1 queue2 queue3)(out) lang=python
creates a python node that has three merged queues
as input, queue1
,
queue2
, and queue3
. The generated template for mergedinpy
, by default, will read from the three queues and output the
matching fields to out. The order of the observations in the
output is indeterminant. The code in the worker function
does not have to be thread-safe unless more than one thread
is requested for the node.
The generated template for mergedinpy
has one difference from a node without merged input;
the input variable for the worker function type will
change depending on the queue the data was sourced.
If different actions need to be performed depending on the input queue, then use the Python match statement to determine which queue the data was sourced. The following is an example match statement:
def sample(input any):
match input :
case Types.Types():
# reading from the types queue
case Typesin.Typesin():
# reading from the typesin queue
case Fanout.Fanout():
# reading from the types queue
Inside each case statement, the variable input
is of the specified type.
Multiple Input Queues¶
Computational nodes can accept multiple input queues.
These queues are read separately and are not merged together.
The first queue, or queues in the merged case, are treated as
the main input queue. The main input queue is what the node
processes one observation at a time. All remaining input
queues are read into pandas
data frames before the __init__
function is called. Copies of these data frames are
passed to each thread the python nodes uses.
Example¶
node multiInPy(merge1 merge2, addQueue1, addQueue2)(typesout) lang=python
In this example, the node multiInPy
will get one main queue
that the merger of
merge1
and merge2
. Additionally, two data frames will be created
named addqueue1_df
and addqueue2_df
.
Important Notes About Multiple Input Queues in Python¶
- The additional queues will be contained entirely in memory. The largest queue should be the main queue.
- A copies of the additional queues made for each threads.
- The arrays containing the additional queues are thread-safe for reads and writes.
- GroupKey settings on additional queues are ignored.
Python Threads¶
Python is not a thread-safe language, so threading is done by starting multiple Python processes. Threading is done in VOR Stream using the multiprocessing package. This implies sharing objects between processes is difficult. Objects like matrix maps and scenarios are duplicated for each Python thread.
Using Matrix Maps¶
The code example in the previous section had an example of loading and printing a matrix map. Matrix maps are represented in Python as a dictionary of dictionaries. The outermost dictionary is keyed off of the row keys of the matrix map. The innermost dictionaries are keyed off of the column keys of the matrix map. For more details on the structure and use of matrix maps, please see the Data Sources - Matrix Maps section. To access a specific row and column of the matrix use:
matrix["row key"]["col key"]
In the example above, that would be:
fx = self.fx["USD"]["EUR"]
It is good programming practice to check for existence of keys before requesting a value to avoid premature program termination:
if "USD" in self.fx:
if "EUR" in self.fx["USD"]:
fx = self.fx["USD"]["EUR"]
Here is an example of what the full *U.py file might look like:
from queues import Output
from queues import Input
from frgutil import frgutil
import datetime
import math
class py_mm_usernode:
options = dict()
def __init__(self, handle, Output):
self.Output = Output
if 'processoptions' in handle.options['JobOptions']:
self.options = handle.options['JobOptions']['processoptions']
else:
self.options = None
# perform one time initializations if necessary
self.hh = handle
self.fx = handle.GetMatrixMap("exchangerates")
# worker is called for each observation sent on the input queue
# input is a named tuple
def worker(self, threadNum, input):
if "USD" in self.fx:
if "EUR" in self.fx["USD"]:
fx = self.fx["USD"]["EUR"]
self.Output.Value2 = input.Value*fx
# You don't have to post to and/or all or output queues
self.Output.Post()
# update statistics
## self.hh.computeStat( "mean", "value", input.Value)
## self.hh.computeStat( "min", "value", input.Value)
## self.hh.computeStat( "max", "LoanAmt", input.LoanAmt)
## self.hh.computeStat( "topN", "otherValue", input.Other)
## self.hh.computeStat( "bottomN", "newValue", input.New)
def term(self):
# Not required to do anything here
return
Node Statistics¶
Every node, by default, computes the total number of observations it processes
from an input queue. This is used by the UI to display progress and for general
process moderating information. For Python computational nodes,
additional statistics can be created and output. Here is the format of the
ComputeStat()
function for Python:
self.hh.computeStat("stat", "<Label>", value)
Where the available statistics are:
- Count – Tracks the number of times this statistic function is called.
- Mean – Computes the mean of the provided values.
- Min – Determines the minimum of the values provided.
- Max – Determines the maximum of the values provided.
- TopN – Determines the top 10 of the values provided.
- BottomN – Determines the bottom 10 of the values provided.
Using Scenarios and Simulations¶
Scenario usage for Python follows the same pattern as scenarios for Golang with the following differences:
- Multiple copies of scenarios are loaded for Python threads. See Python Threads for more explanation.
- In Python you call the routine
scenario.get_scenarios(runID, scenarioSetName)
. get_scenarios()
returns a dictionary of scenarios. Each scenario is a list of dictionaries, one for each date. The dictionary at each date is a name, value pair of the and values of the scenario.
An example of processing a scenario in Python is the following:
from sdk import scenario
import logging
try :
scenarioSetName = handle.options["JobOptions"]["system"]["scenario"]
except KeyError :
scenarioSetName = ""
try :
runID = handle.options["RunID"]
except (TypeError,KeyError) :
runID = 0
scen = scenario.GetScenarios(runID, scenarioSetName)
logging.info("There are %d scenarios", len(scen))
## loop over scenarios
for scenarioName, s in scen.items():
# loop over horizons
logging.info(scenarioName)
for horizonNumber, horiz in enumerate(s):
for name, value in horiz.items():
logging.info("%s = %s", name, value)
Using Simulations¶
Simulation usage for Python follows the same pattern as simulations for Golang with the following differences:
- Multiple copies of simulations are loaded for Python threads. See Python Threads section for more explanation.
- In Python the routine
self.hh.GetSims()
is called. GetSims()
returns dict[<variable name>][<value index>], and a three-dimensional list [<simNum>] [<date horizon>][<values>].
Using Process Options in a Python Computational Node¶
Process options are available at run time in the user structure of the computational node. The structure entry is called options and has the type dict().
To access the value of a specific option:
value = self.options["<category>"]["<sub-category>"]["<name>"]
It is good coding practice to check the existence of a key in a dictionary before dereferencing. Here is an example from the Python initialization block :
if 'processoptions' in handle.options['JobOptions']:
self.options = handle.options['JobOptions']['processoptions']
else:
self.options = None
Sending and Waiting on Signals¶
If there is non-queue-based input required from another node, then using a wait signal will be necessary to avoid missing information. An example of non-queue-based input is a CSV file or a database table that is updated as part of the process. All output nodes produce signals when they are done writing. The signal sent is the name of the output node.
To wait on a signal in a computational node, the signal must be declared at node creation time with the -getsig=<signal name> option on the node statement in a .strm file. For example, to wait on the signal godot in the node life use:
node life(input)(output) getsig=godot
To wait on multiple signals, add them as a list of comma-separated signal names.
This syntax only implies that this node could wait on these signals but
does not require that it in fact waits. In order to actually wait on a signal, a
call to self.hh.WaitSignals()
must be added to the user node code. Example
call for the above example is:
self.WaitSignals( ["godot"])
When executed in test mode, WaitSignals()
will return without waiting. When
the node is part of a process, WaitSignals()
will block the current thread
execution until all the signals in the list are received.
A user node can send signals of its own to other nodes. These signals do not
have to be declared by the producing node or be consumed by any node. Care must
made in choosing a signal name as it should not correspond any existing node
names. A signal can be sent using the SendSignal()
function. Usage:
self.hh.SendSignal("godot")
SendSignal()
checks to see if the node waiting on the signal is ready to
receive the signal before the signal is sent. SendSignal()
will wait as long
as 20 seconds for the receiving node to start accepting signals.
Setting and Getting Dynamic Facts¶
Setting and getting dynamic facts for Python follows the same pattern as for Golang with the following differences:
- In Python to set a fact use the routine
self.hh.DynFactSet()
. The argument is a dictionary containing the fact or facts that are to be shared. - In Python to get a fact use the routine
self.hh.DynFactGet(<name>)
. The argument is a string containing the name of the desired fact. Theself.hh.DynFactGet()
returns a dictionary or None.
You must declare the name of the facts being requested by the node with the getdyn= option when creating the node. Similarly, declare the facts created by a node using the setdyn= option.
See additional examples in Functions for Signals and Dynamic Facts
Sending Email from Python nodes¶
See the SDK documentation on sending email.
Reading Excel (.xlsx) files in Python¶
See the SDK documentation on reading Excel files.
Reading CSV (.csv) files in Python¶
Streaming is for big data. That is data that is greater than 1k observations. For smaller data sets we have the matrix map capability but matrix maps are only for numeric fields and only have two dimensions. Generalized CSV file input helps to bridge this gap.
CSV files can be in the input directory, the look through directory, or be uploaded and added to a run. CSV files in the playpen and look through directories are considered overrides for CSV files that are uploaded.
CSV files are requested using the sdk.csv.ReadCSV() function. CSV files are first searched for in the local directory and then the upload list for the run. CSV files can be requested by type which implies that the csv file will only be found in the upload list.
For python, there isn't a helper function getting the runID for the user so the syntax is:
from sdk import csv
cols = {"class1": "char", "class2": "char", "value": "num"}
## a runID of 0 implies no upload files to search
csv2, warn, err = csv.ReadCSV("input.csv", cols, 0, "class2", "class1" [, delim=','])
print(csv2, warn, err, csv2["North Carolina"]["Raleigh"])
The optional keyNames would be a list of fields in the CSV file that would be used to create a nested map of the data. The limit on the number of keys is three (3). The delimiter defaults to ",". The other delimiters available are; " ", ";", "|", and "\t".
The column description is used to describe the types and field names that are expected in the file. For example:
columns = { "date":"date",
"x":"num",
"*","char"}
specifies that the csv file should contain variables date and x and they are types date and number. Any other fields found in the CSV file will have the type char. Without the "*" field name, only the fields date and x would be returned. The returned warning string will contain warnings about missing fields and conversion errors. It is up to the user to report these or ignore.
Precompiled Libraries¶
Python allows users to employ compiled libraries that can be imported into computational nodes. To use these libraries at run time, the shared object files (*.so files) must be placed in the playpen lib directory. The lib directory can contain the actual shared object or a link to the shared object.