Skip to content

Input / Output (IO)

Introduction

Input and output nodes (i.e., IO nodes) are used to read or write data to disk from a process. This makes IO nodes appear at the beginning, end, and sync points in a process. The format of the files is the universal comma-separated value (CSV) format, SAS datasets, PostgreSQL, MSSQL data tables, or S3 storage. Each IO node can read or write only one table at a time.

SAS datasets are only supported for reading directly with VOR. Writing to SAS datasets requires the use of licensed SAS product called through a SAS node. Reading a SAS dataset doesn't require a SAS install or a license.

Output nodes send out signals when they are done writing. The name of the signal is the name of the output node. Use the name= option on the node to avoid an autogenerated name for the node. Users can use the WaitSignals() functionality to wait on an output to finish writing. Consider the following three output statements:

out mid1 -> csv_output.csv name=out1
out mid2 -> django.db_output db = pg  name=pg2
out mid3 -> var.MID3  name=subs

The three signals sent for this example are "out1", "pg2", and "subs". Computational nodes can then wait on a specific set of signals if they need to run a batch process that requires a set of output files to be available before running. See signal features in computational nodes for more details.

CSV Input and Output

The first line in a CSV file is a comma-delimited list of field names. For an example, see the Data Sources section. The CSV file is specified without the ".csv" suffix. This allows the CSV file to either be compressed with the ".csv.gz" extension or uncompressed with the ".csv" extension. The location of the input CSV files is in the input directory, or subdirectory, of the playpen for the process.

Alternately, the lookthroughpath can be specified in the system options which directs the program to first look in the playpen input directory for the data. If the data is not found in the playpen, then it may be in the lookthroughpath path location for the data. This allows for an inheritance structure for the data.

The output CSV files are put in a subdirectory of the playpen output directory. The output subdirectory is the same name as the executed process or the value of the --name option. Output is not produced until the process is run with the vor run command. When using CSV files, an IO node via a .strm file. See the Getting Started - Run the Process section for an example of how to create and run an IO node process using a .strm file.

IO nodes are created as part of a process definition in a stream file (*.strm). The syntax for an input node is:

in | input <csvfile> | <database path> -> <queue>  [delim= "," | " " | ";" | "|" | "\t"] [db = pg | mssql | sas | csv ]
         [name=<node name>] [descr="<description of node>]
         [where="<where clause>"]
         [select= "sql select"]

where

Option Description
csvfile Can use a relative path inside the input directory, *mydir/foo.csv. See section below on substitution variables for a more dynamic way to declare input files.
database path Specified as [\<database name>.]\<schema name>.\<table name>.
delim Optional delimiter for the input CSV file. The default is ","
db The target database. The default for ".csv" files is CSV and for ".sas7bdat" is SAS. For other two part names the default is PostgreSQL, PG.
name A name assigned to the node. Used to name the log file, the documentation, and the UI. A default name is provided if not specified
descr A quoted string describing the node. This is put in the documentation.
where A quoted string containing a WHERE clause to apply to the input. The WHERE clause needs to evaluate to a Boolean.
select Only for PostgreSQL and MS SQL. This is a valid select statement passed to the database to select specific records for input.

Examples of valid input node declarations are :

in my.csv -> myQueue
in my -> myQueue   descr="Don't need to specify '.csv' explicitly"
input your.csv -> yourQueue name=ReadYour descr="Read in your input CSV file."

in mySchema.myTable -> queue7
in transactions.Port -> port db=mssql select="select * where asOfDate > '10/4/2022';"

input tabs -> theirQueue delim='\t'  descr="Read in a tab delimited file"

in input_csv2.csv -> input_csv2 where="lower(id) != 'daniel'"
in sastypes.sas7bdat -> types name=sasinput

The syntax for an output node is:

out | output <queue> -> <csvfile | database path>
        [db = pg | csv | mssql [{ check_constraints=true|false
        fire_triggers=true|false keep_nulls=true|false
        kb=<kilo Bytes Per Batch>  rows=<rows Per Batch>
        tablock=true|false order=<name>[,<name>]] }]
        [where="<where clause>"]
        [compress=true|false]
        [exec_when= comma separated list of flags]
        [name=<node name>] [descr="<description of node>]

where

Option Description
compress Indicate whether to compress the CSV output or not. The default is false.
db The target database. The default for "*.csv" files is CSV. For other two part names the default ist PostgreSQL. PG.
Check_Constraints Bulk load option. Only for MSSQL. Check all constraints on the target table.
Fire_Triggers Bulk load option. Only for MSSQL. Execute insert triggers.
Keep_Nulls Bulk load option. Only for MSSQL. Retain null value instead of setting default values.
kb Bulk load option. Only for MSSQL. Number of kilobytes of data per batch.
Rows Bulk load option. Only for MSSQL. Number of rows per batch.
Order Bulk load option. Only for MSSQL. Comma-separated list of how data is sorted.
Tablock Bulk load option. Only for MSSQL. Request a table level lock during bulk insert operation
name A name assigned to the node. Used to name the log file, the documentation, and the UI. A default name is provided if not specified
descr A quoted string describing the node. This is put in the documentation.
where A quoted string containing a WHERE clause to apply to the input. The WHERE clause needs to evaluate to a Boolean.
exec_when A comma separated list of flags. The flags indicate whether the node produces output.
select Only for PostgreSQL and MS SQL. This is a valid select statement pass to the database to select specific records for input.

The defaults for the bulk load options on MS SQL are whatever the defaults are for the MS SQL instance. Where clauses can use all the functions and syntax that is available to the Eval() function (See Evaluating Run Time Expressions in SQL

If an output node has set one or more exec_when= flags, the output node will only output to the destination if the corresponding flag is set in the joboptions.json file, or in the options tab on the run studies screen, or using the --exec-when command line option. There is currently some differences on how spaces are handled in options so:

  • exec_when - in a stream file
  • --exec-when - on the command line
  • execwhen - in joboptions.json.

Exec when allows for output nodes that only operate under certain conditions. For example for debugging or testing. For nodes that output to a database, the database doesn't have to exist if the execution of the output node is disabled. That is, no error will be produced from the node.

Examples of valid output node declarations are :

out answers -> ans.csv
output answers -> ans   // Don't need the .csv extension

out mine -> mine.csv compress=true name=MineCompressed desc="Compressed output is smaller but slower to write"

out yours -> reporting.final db=pg

out mine -> datalake.reports.mine db=mssql {keep_nulls=true check_constraints=true}

out output_join1 -> join.csv  where="upper(id) != 'CLAIRE'"

out debugQueue -> debug.csv exec_when = debug

The last output node will create debug.csv when the following is set in the system options in joboptions.json:

{
   "system": {
   "output": {
         "usedate": false,
         "execwhen":{"audit":false, "debug":true}
      }
   }
}

S3 Input and Output

S3 Input Node

To read input directly from S3, you can use the following syntax for an input node:

in | input <s3Path> -> <queue>
        [name=<node name>]
        [compress=true|false]
        [descr="<description of node>"]
        [where="<where clause>"]

where

Option Description
s3Path The S3 path where the output will be written, formatted as s3://<bucket-name>/<object/key/name>.
name A name assigned to the node. Used to name the log file, the documentation, and the UI. A default name is provided if not specified.
compress Indicate whether the CSV input is compressed or not. The default is false.
descr A quoted string describing the node. This is put in the documentation.
where A quoted string containing a WHERE clause to apply to the input. The WHERE clause needs to evaluate to a Boolean.

Here are some examples of valid S3 input node declarations:

in s3://some-bucket/some-input-file.csv -> input name=MyS3Input descr="Input from S3"
input s3://my-bucket/scenario-input.csv -> input name=ScenarioInput descr="Scenario input from S3"
in s3://myBucket/myObjectKey.csv.gz -> input name=GZippedFile descr="Compressed input from S3"
in s3://logs-bucket/error-logs compress -> input name=CompressedErrorLogs descr="Compressed logs from S3"
in s3://project-directory/folder/subfolder/file.csv -> input name=NestedPathInput descr="Input with nested path from S3"
in s3://vor-stream-bucket/vor-stream-input.csv -> input where="value<100" name=S3InputFiltered descr="Filtered input from S3"

S3 Output Node

To write output directly to S3, you can use the following syntax for an output node:

out | output <queue> -> <s3Path>
        [name=<node name>]
        [descr="<description of node>"]
        [compress=true|false]
        [mode=append|replace]
        [where="<where clause>"]
        [exec_when= comma separated list of flags]

where

Option Description
s3Path The S3 path where the output will be written, formatted as s3://<bucket-name>/<object/key/name>.
name A name assigned to the node. Used to name the log file, the documentation, and the UI. A default name is provided if not specified.
descr A quoted string describing the node. This is put in the documentation.
compress Indicate whether to compress the CSV output or not. The default is false.
mode Indicate the write mode. Accept only either replace(default) or append.
where A quoted string containing a WHERE clause to apply to the input. The WHERE clause needs to evaluate to a Boolean.
exec_when A comma separated list of flags. The flags indicate whether the node produces output.

Here are some examples of valid S3 output node declarations:

out output -> s3://my-bucket/my-output name=MyS3Output descr="Output to S3"
output output -> s3://someBucket/someObjectKey name=S3Output descr="Output to S3"
out output -> s3://output-bucket/output.csv name=CSVS3Output descr="Output CSV to S3"
out output -> s3://logs/errors.csv.gz compress name=CompressedErrorLogs descr="Output compressed error logs to S3"
out output -> s3://output-data/some/nested/path/output.csv name=NestedPathOutput descr="Writing data with nested path to S3"
out output -> s3://config-bucket/latest-config mode=replace name=ReplacedS3Output descr="Replacing data in S3"
out output -> s3://data-bucket/combined-output-data mode=append name=AppendedS3Output descr="Appending data to existing CSV in S3"
out output -> s3://data-bucket/compressed-output-data compress mode=append name=AppendedGZippedS3Output descr="Appending data to existing compressed file in S3"
out output -> s3://output-data/filtered-data where="value<100" name=S3FilteredOutput descr="Filtered output to S3"
out output -> s3://debug-bucket/debug-output exec_when=debug name=S3DebugOutput descr="Output to S3 for Debug"

Write Modes: Append/Replace

Users can control how data is written to S3 using the mode option:

  • replace(default): Overwrites the existing file (if exists).
  • append: Appends data to an existing file, creating a new one if it does not exist.

Variable substitution

Users can use variable substitution when defining a S3 node in stream file. This feature can be applied to both S3 input and output nodes.

The following example demonstrates how variable substitution works with S3 nodes:

in s3://myBucket/playpen_${var.playpen_name}/input${var.s3_input_file} -> input
node usernode(input)(output) setdyn=run_name
out output -> s3://myBucket/playpen_${var.playpen_name}/run${dyn.run_name}/myOutput.csv getdyn=run_name

This offers flexibility to users, enabling users to specify dynamic S3 paths without the need to edit stream file and recreating processes.

For more details on variable substitution, please refer to the section Dynamic Substitution.

PostgreSQL Input and Output

The following is a mapping of the VOR Stream data types to the PostgreSQL data type:

VOR Stream PostgreSQL Notes
num double precision Supports NaN, infinity, - infinity
char varchar, text Truncation will occur if the varchar length is not long enough
bool boolean n/a
array double precision array n/a
date date n/a
datetime timestamp, time UTC is assumed if no time zone specified
int bigint Suitable for sequences and keys

The mappings for MSSQL are the same except for array types, which aren't supported by MSSQL.

For output tables, the table must already exist so values may be appended to the table. For input tables, there is an optional SELECT that when executed, input values can be obtained. The user can provide any valid SELECT statement for the PostgreSQL/MSSQL database, including nested joins.

SAS dataset input

The following is a mapping of the VOR Stream data types to the SAS dataset type:

VOR Stream SAS Notes
num number Supports missing values
char character Fixed length SAS strings blank padding removed
bool number 0 = false, True otherwise
array SAS array The smaller of the two array sizes is used
date number Dates in SAS should have a format of DATE or MMDDYYY
datetime number Datetimes in SAS should have a format of DATETIME
int number Truncated to a 64 bit integer value

Using Substitution Variables for input CSV files

Input files used by a process can be modified without recreating/changing the process. This functionality can be used for input nodes that are using CSV files. Instead of writing something like this in a stream file:

in portfolio.csv -> portfolio
in subdir/rules.csv -> rules

a user can write

in var.port -> portfolio
in var.rules -> rules

The user can either define these variables in the joboptions.json file:

{
   "system": {
      "scenario":"testscen",
      "var":{"input":"/home/derdman/playpen/input/input.csv", "port":"portfolio.csv", "rules":"input/rules.csv"}
      }
   }

and/or add an upload type corresponding to the substitution name. For example, add an upload type called port and then upload a portfolio using that type and associate it with the study. Done this way, the user can use the default portfolio portfolio.csv defined in the joboptions.json file or add a portfolio file to their study.

Substitutions in the joboptions.json file can either resolve to files relative to the input directory or use absolute paths.