Input / Output (IO)
Introduction¶
Input and output nodes (i.e., IO nodes) are used to read or write data to disk from a process. This makes IO nodes appear at the beginning, end, and sync points in a process. The format of the files is the universal comma-separated value (CSV) format, SAS datasets, PostgreSQL, MSSQL data tables, or S3 storage. Each IO node can read or write only one table at a time.
SAS datasets are only supported for reading directly with VOR. Writing to SAS datasets requires the use of licensed SAS product called through a SAS node. Reading a SAS dataset doesn't require a SAS install or a license.
Output nodes send out signals when they are done writing. The name of the signal is the name of the output node. Use the name= option on the node to avoid an autogenerated name for the node. Users can use the WaitSignals() functionality to wait on an output to finish writing. Consider the following three output statements:
out mid1 -> csv_output.csv name=out1
out mid2 -> django.db_output db = pg name=pg2
out mid3 -> var.MID3 name=subs
The three signals sent for this example are "out1", "pg2", and "subs". Computational nodes can then wait on a specific set of signals if they need to run a batch process that requires a set of output files to be available before running. See signal features in computational nodes for more details.
Common Options¶
All forms of the input and output nodes have a set of common options. These options are:
Option | Description |
---|---|
name | A name assigned to the node. Used to name the log file, the documentation, and the UI. A default name is provided if not specified |
descr | A quoted string describing the node. This is put in the documentation. |
label | A quoted string providing a short description of the node. Used in displaying the process in the documentation and UI. This is optional and defaults to the name of the node. |
getdyn | Optional comma separated list of dynamic facts the Input node needs. The node will wait for the facts to be available before running. |
getsig | Optional comma separated list of signals. The node will wait for the signals before running. |
CSV Input and Output¶
The first line in a CSV file is a comma-delimited list of field names. For an example, see the Data Sources section. The CSV file is specified without the ".csv" suffix. This allows the CSV file to either be compressed with the ".csv.gz" extension or uncompressed with the ".csv" extension. The location of the input CSV files is in the input directory, or subdirectory, of the playpen for the process.
Alternately, the lookthroughpath can be specified in the system options which directs the program to first look in the playpen input directory for the data. If the data is not found in the playpen, then it may be in the lookthroughpath path location for the data. This allows for an inheritance structure for the data.
The output CSV files are put in a subdirectory of the playpen output directory. The output subdirectory is the same name as the executed process or the value of the --name option. Output is not produced until the process is run with the vor run command. When using CSV files, an IO node via a .strm file. See the Getting Started - Run the Process section for an example of how to create and run an IO node process using a .strm file.
IO nodes are created as part of a process definition in a stream file (*.strm). The syntax for an input node is:
in | input <csvfile> -> <queue> [delim= "," | " " | ";" | "|" | "\t"] [db = sas | csv ]
[where="<where clause>"]
[<common options>]
where
Option | Description |
---|---|
csvfile | Can use a relative path inside the input directory, *mydir/foo.csv. See section below on substitution variables for a more dynamic way to declare input files. |
delim | Optional delimiter for the input CSV file. The default is "," |
db | The target database. The default for ".csv" files is CSV and for ".sas7bdat" is SAS. For other two part names the default is PostgreSQL, PG. |
where | A quoted Go syntax string for filtering input data after it is loaded. It supports Go templating and must evaluate to a boolean value. |
common options | See common options. |
Examples of valid input node declarations are :
in my.csv -> myQueue
in my -> myQueue descr="Don't need to specify '.csv' explicitly"
input your.csv -> yourQueue name=ReadYour descr="Read in your input CSV file." label="read your.csv"
in mySchema.myTable -> queue7
in transactions.Port -> port db=mssql select="select * where asOfDate > '10/4/2022';"
input tabs -> theirQueue delim='\t' descr="Read in a tab delimited file"
in input_csv2.csv -> input_csv2 where="lower(id) != 'daniel'"
in sastypes.sas7bdat -> types name=sasinput
The syntax for an output node is:
out | output <queue> -> <csvfile | database path>
[db = pg | csv | mssql ]
[where="<where clause>"]
[compress=true|false]
[exec_when= comma separated list of flags]
[<common options>]
where
Option | Description |
---|---|
compress | Indicate whether to compress the CSV output or not. The default is false. |
db | The target database. The default for "*.csv" files is CSV. For other two part names the default ist PostgreSQL. PG. |
where | A quoted Go syntax string for filtering input data after it is loaded. It supports Go templating and must evaluate to a boolean value. |
exec_when | A comma separated list of flags. The flags indicate whether the node produces output. |
common options | See common options. |
The defaults for the bulk load options on MS SQL are whatever the defaults are for the MS SQL instance. Where clauses can use all the functions and syntax that is available to the Eval() function (See Evaluating Run Time Expressions in SQL)
If an output node has set one or more exec_when= flags, the
output node will only
output to the destination if the corresponding flag is set in
the joboptions.json
file, or in the options tab on the run studies screen,
or using the --exec-when command line option. There is currently some
differences on how spaces are handled in options so:
- exec_when - in a stream file
- --exec-when - on the command line
- execwhen - in joboptions.json.
Exec when allows for output nodes that only operate under certain conditions. For example for debugging or testing. For nodes that output to a database, the database doesn't have to exist if the execution of the output node is disabled. That is, no error will be produced from the node.
Examples of valid output node declarations are :
out answers -> ans.csv
output answers -> ans // Don't need the .csv extension
out mine -> mine.csv compress=true name=MineCompressed desc="Compressed output is smaller but slower to write"
label="Output mine.csv"
out yours -> reporting.final db=pg
out mine -> datalake.reports.mine db=mssql(keep_nulls=true check_constraints=true)
out output_join1 -> join.csv where="upper(id) != 'CLAIRE'"
out debugQueue -> debug.csv exec_when = debug
The last output node will create debug.csv when the following is set in the system options in joboptions.json:
{
"system": {
"output": {
"usedate": false,
"execwhen":{"audit":false, "debug":true}
}
}
}
S3 Input and Output¶
S3 Input Node¶
To read input directly from S3, you can use the following syntax for an input node:
in | input <s3Path> -> <queue>
[compress=true|false]
[where="<where clause>"]
[<common options>]
where
Option | Description |
---|---|
s3Path | The S3 path where the output will be written, formatted as s3://<bucket-name>/<object/key/name> . |
compress | Indicate whether the CSV input is compressed or not. The default is false. |
where | A quoted Go syntax string for filtering input data after it is loaded. It supports Go templating and must evaluate to a boolean value. |
common options | See common options. |
Here are some examples of valid S3 input node declarations:
in s3://some-bucket/some-input-file.csv -> input name=MyS3Input descr="Input from S3"
input s3://my-bucket/scenario-input.csv -> input name=ScenarioInput descr="Scenario input from S3"
in s3://myBucket/myObjectKey.csv.gz -> input name=GZippedFile descr="Compressed input from S3"
in s3://logs-bucket/error-logs compress -> input name=CompressedErrorLogs descr="Compressed logs from S3"
in s3://project-directory/folder/subfolder/file.csv -> input name=NestedPathInput descr="Input with nested path from S3"
in s3://vor-stream-bucket/vor-stream-input.csv -> input where="value<100" name=S3InputFiltered descr="Filtered input from S3"
S3 Output Node¶
To write output directly to S3, you can use the following syntax for an output node:
out | output <queue> -> <s3Path>
[compress=true|false]
[mode=append|replace]
[where="<where clause>"]
[exec_when= comma separated list of flags]
[<common options>]
where
Option | Description |
---|---|
s3Path | The S3 path where the output will be written, formatted as s3://<bucket-name>/<object/key/name> . |
compress | Indicate whether to compress the CSV output or not. The default is false. |
mode | Indicate the write mode. Accept only either replace (default) or append . |
where | A quoted Go syntax string for filtering input data after it is loaded. It supports Go templating and must evaluate to a boolean value. |
exec_when | A comma separated list of flags. The flags indicate whether the node produces output. |
common options | See common options. |
Here are some examples of valid S3 output node declarations:
out output -> s3://my-bucket/my-output name=MyS3Output descr="Output to S3"
output output -> s3://someBucket/someObjectKey name=S3Output descr="Output to S3"
out output -> s3://output-bucket/output.csv name=CSVS3Output descr="Output CSV to S3"
out output -> s3://logs/errors.csv.gz compress name=CompressedErrorLogs descr="Output compressed error logs to S3"
out output -> s3://output-data/some/nested/path/output.csv name=NestedPathOutput descr="Writing data with nested path to S3"
out output -> s3://config-bucket/latest-config mode=replace name=ReplacedS3Output descr="Replacing data in S3"
out output -> s3://data-bucket/combined-output-data mode=append name=AppendedS3Output descr="Appending data to existing CSV in S3"
out output -> s3://data-bucket/compressed-output-data compress mode=append name=AppendedGZippedS3Output descr="Appending data to existing compressed file in S3"
out output -> s3://output-data/filtered-data where="value<100" name=S3FilteredOutput descr="Filtered output to S3"
out output -> s3://debug-bucket/debug-output exec_when=debug name=S3DebugOutput descr="Output to S3 for Debug"
Write Modes: Append/Replace¶
Users can control how data is written to S3 using the mode
option:
replace
(default): Overwrites the existing file (if exists).append
: Appends data to an existing file, creating a new one if it does not exist.
Variable substitution¶
Users can use variable substitution when defining a S3 node in stream file. This feature can be applied to both S3 input and output nodes.
The following example demonstrates how variable substitution works with S3 nodes:
in s3://myBucket/playpen_${var.playpen_name}/input${var.s3_input_file} -> input
node usernode(input)(output) setdyn=run_name
out output -> s3://myBucket/playpen_${var.playpen_name}/run${dyn.run_name}/myOutput.csv getdyn=run_name
This offers flexibility to users, enabling users to specify dynamic S3 paths without the need to edit stream file and recreating processes.
For more details on variable substitution, please refer to the section Dynamic Substitution.
Database Input and Output¶
VOR supports reading from and writing to relational databases. Currently, PostgreSQL and MSSQL are supported. Before running a process, ensure the database server is running and the required tables exist with appropriate permissions (read for input, write for output).
Database Input Node¶
To read from a database, the syntax is as follows:
in | input <database path> -> <queue> db=pg | mssql
(connection=<path to key value store>)
[where="<where clause>"]
[select= "sql select"]
[<common options>]
where
Option | Description |
---|---|
database path | Specified as [<database name>.]<schema name>.<table name> . |
db | The target database. Accepted values are pg for PostgreSQL and mssql for MSSQL. The default is PostgreSQL. |
connection | The path to the key-value store for the database connection. For more information, see Setting up Database Connections |
where | A quoted Go syntax string for filtering input data after it is loaded. It supports Go templating and must evaluate to a boolean value. This DOES NOT replace an SQL WHERE clause. For better performance, use filtering directly in the select option when possible. |
select | This is a valid select statement passed to the database to select specific records for input. |
common options | See common options. |
Examples of valid input node declarations are:
// Read from PostgreSQL
in public.myTable -> myQueue db=pg
// Read from MSSQL
in dbo.myTable -> myQueue db=mssql(connection="playpen/mssql")
Database Output Node¶
Writing to the database has a similar syntax:
out | output <queue> -> <database path>
db = pg | mssql (connection=<path to key value store> | mssql_options )
[where="<where clause>"]
[exec_when= comma separated list of flags]
[<common options>]
where
Option | Description |
---|---|
db | The target database. Accepted values are pg for PostgreSQL and mssql for MSSQL. The default is PostgreSQL. |
connection | The path to the key-value store for the database connection. For more information, see Setting up Database Connections |
mssql_options | The options for MSSQL bulk load. See MSSQL Bulk Load Options for more details. |
where | A quoted Go syntax string for filtering input data after it is loaded. It supports Go templating and must evaluate to a boolean value. This DOES NOT replace an SQL WHERE clause. For better performance, use filtering directly in the select option when possible. |
exec_when | A comma separated list of flags. The flags indicate whether the node produces output. |
common options | See common options. |
Examples of valid output node declarations are:
// Write to PostgreSQL
out myQueue -> public.myTable db=pg(connection="playpen/risk/postgres")
// Write to MSSQL
out myQueue -> dbo.myTable db=mssql
(connection="test/mssql"
check_constraints=true|false
fire_triggers=true|false
keep_nulls=true|false
kb=<kilo Bytes Per Batch>
rows=<rows Per Batch>
tablock=true|false
order=<name>[,<name>])
Note
Output tables must already exist so that values may be appended to the table
MSSQL Bulk Load Options¶
When writing to MSSQL, you can use the following options to control the bulk load behavior:
Option | Description |
---|---|
Check_Constraints | Check all constraints on the target table. |
Fire_Triggers | Execute insert triggers. |
Keep_Nulls | Retain null value instead of setting default values. |
kb | Number of kilobytes of data per batch. |
Rows | Number of rows per batch. |
Order | Comma-separated list of how data is sorted. |
Tablock | Request a table level lock during bulk insert operation |
Setting up Database Connections¶
Warning
Starting with the v0.11 release, database connection parameters can no longer be defined using environment variables. Instead, configure these parameters in the Vault key-value store and provide the KV store path to the database node. This approach enhances security and simplifies the management of database connections. For details on configuring database connections in Vault, see Configure Database Connection in the Admin Guide.
At startup, the database node will look for a path to the Vault key-value (KV)
store, which contains all the connection information (e.g., hostname, port, username)
needed to connect to the database. The easiest way to set up a new database
connection in Vault is through the vor create secret
command.
When setting up a database node, you can use either the default database connection or configure a custom connection for the node. Configuration can be set at three levels, listed below in order of precedence (from lowest to highest):
-
Global configuration
Contact your VOR administrator to configure the database connection path at the system-wide level. This will apply the configuration to all database nodes by default. -
Playpen level
Set thedatabase-conn-path
option in theconfig.yaml
file located in the playpen root directory. This will apply the configuration to all database nodes within that playpen. Theconfig.yaml
file should look like this:PLAY: "/opt/playpens/my-playpen" database-conn-path: "postgres"
-
Node level
Use theconnection
option to specify the path to the KV store for a specific database node. This will override the global and playpen-level configurations for that node only. For example:in my_schema.my_table -> myQueue db=pg(connection="postgres")
If none of the above configurations are set, the create process command will fail.
Supported Data Types¶
The following is a mapping of the VOR Stream data types to the PostgreSQL data type:
VOR Stream | PostgreSQL | Notes |
---|---|---|
num | double precision | Supports NaN, infinity, - infinity |
char | varchar, text | Truncation will occur if the varchar length is not long enough |
bool | boolean | n/a |
array | double precision array | n/a |
date | date | n/a |
datetime | timestamp, time | UTC is assumed if no time zone specified |
int | bigint | Suitable for sequences and keys |
Tip
The mappings for MSSQL are the same except for array types, which aren't supported by MSSQL.
Advanced Usage¶
Use the select
option to provide a custom SQL query for
input nodes, enabling filtering, joins, or computed columns.
in mySchema.myTable -> myQueue db=pg
select="SELECT * FROM mySchema.myTable WHERE id > 100"
SAS dataset input¶
The following is a mapping of the VOR Stream data types to the SAS dataset type:
VOR Stream | SAS | Notes |
---|---|---|
num | number | Supports missing values |
char | character | Fixed length SAS strings blank padding removed |
bool | number | 0 = false, True otherwise |
array | SAS array | The smaller of the two array sizes is used |
date | number | Dates in SAS should have a format of DATE or MMDDYYY |
datetime | number | Datetimes in SAS should have a format of DATETIME |
int | number | Truncated to a 64 bit integer value |
Using Substitution Variables for input CSV files¶
Input files used by a process can be modified without recreating/changing the process. This functionality can be used for input nodes that are using CSV files. Instead of writing something like this in a stream file:
in portfolio.csv -> portfolio
in subdir/rules.csv -> rules
a user can write
in var.port -> portfolio
in var.rules -> rules
The user can either define these variables in the joboptions.json
file:
{
"system": {
"scenario":"testscen",
"var":{"input":"/home/derdman/playpen/input/input.csv", "port":"portfolio.csv", "rules":"input/rules.csv"}
}
}
and/or add an upload type corresponding to the substitution name. For example,
add an upload
type called port
and then
upload a portfolio using that type and associate it with the study.
Done this way, the user can use the default portfolio portfolio.csv
defined
in the joboptions.json
file or add a portfolio file to their study.
Substitutions in the joboptions.json
file can either resolve to files relative
to the input
directory or use absolute paths.