Input / Output (IO)
Introduction¶
Input and output nodes (i.e., IO nodes) are used to read or write data to disk from a process. This makes IO nodes appear at the beginning, end, and sync points in a process. The format of the files is the universal comma-separated value (CSV) format, SAS datasets, PostgreSQL, MSSQL data tables, or S3 storage. Each IO node can read or write only one table at a time.
SAS datasets are only supported for reading directly with VOR. Writing to SAS datasets requires the use of licensed SAS product called through a SAS node. Reading a SAS dataset doesn't require a SAS install or a license.
Output nodes send out signals when they are done writing. The name of the signal is the name of the output node. Use the name= option on the node to avoid an autogenerated name for the node. Users can use the WaitSignals() functionality to wait on an output to finish writing. Consider the following three output statements:
out mid1 -> csv_output.csv name=out1
out mid2 -> django.db_output db = pg name=pg2
out mid3 -> var.MID3 name=subs
The three signals sent for this example are "out1", "pg2", and "subs". Computational nodes can then wait on a specific set of signals if they need to run a batch process that requires a set of output files to be available before running. See signal features in computational nodes for more details.
CSV Input and Output¶
The first line in a CSV file is a comma-delimited list of field names. For an example, see the Data Sources section. The CSV file is specified without the ".csv" suffix. This allows the CSV file to either be compressed with the ".csv.gz" extension or uncompressed with the ".csv" extension. The location of the input CSV files is in the input directory, or subdirectory, of the playpen for the process.
Alternately, the lookthroughpath can be specified in the system options which directs the program to first look in the playpen input directory for the data. If the data is not found in the playpen, then it may be in the lookthroughpath path location for the data. This allows for an inheritance structure for the data.
The output CSV files are put in a subdirectory of the playpen output directory. The output subdirectory is the same name as the executed process or the value of the --name option. Output is not produced until the process is run with the vor run command. When using CSV files, an IO node via a .strm file. See the Getting Started - Run the Process section for an example of how to create and run an IO node process using a .strm file.
IO nodes are created as part of a process definition in a stream file (*.strm). The syntax for an input node is:
in | input <csvfile> | <database path> -> <queue> [delim= "," | " " | ";" | "|" | "\t"] [db = pg | mssql | sas | csv ]
[name=<node name>] [descr="<description of node>]
[where="<where clause>"]
[select= "sql select"]
where
Option | Description |
---|---|
csvfile | Can use a relative path inside the input directory, *mydir/foo.csv. See section below on substitution variables for a more dynamic way to declare input files. |
database path | Specified as [\<database name>.]\<schema name>.\<table name>. |
delim | Optional delimiter for the input CSV file. The default is "," |
db | The target database. The default for ".csv" files is CSV and for ".sas7bdat" is SAS. For other two part names the default is PostgreSQL, PG. |
name | A name assigned to the node. Used to name the log file, the documentation, and the UI. A default name is provided if not specified |
descr | A quoted string describing the node. This is put in the documentation. |
where | A quoted string containing a WHERE clause to apply to the input. The WHERE clause needs to evaluate to a Boolean. |
select | Only for PostgreSQL and MS SQL. This is a valid select statement passed to the database to select specific records for input. |
Examples of valid input node declarations are :
in my.csv -> myQueue
in my -> myQueue descr="Don't need to specify '.csv' explicitly"
input your.csv -> yourQueue name=ReadYour descr="Read in your input CSV file."
in mySchema.myTable -> queue7
in transactions.Port -> port db=mssql select="select * where asOfDate > '10/4/2022';"
input tabs -> theirQueue delim='\t' descr="Read in a tab delimited file"
in input_csv2.csv -> input_csv2 where="lower(id) != 'daniel'"
in sastypes.sas7bdat -> types name=sasinput
The syntax for an output node is:
out | output <queue> -> <csvfile | database path>
[db = pg | csv | mssql [{ check_constraints=true|false
fire_triggers=true|false keep_nulls=true|false
kb=<kilo Bytes Per Batch> rows=<rows Per Batch>
tablock=true|false order=<name>[,<name>]] }]
[where="<where clause>"]
[compress=true|false]
[exec_when= comma separated list of flags]
[name=<node name>] [descr="<description of node>]
where
Option | Description |
---|---|
compress | Indicate whether to compress the CSV output or not. The default is false. |
db | The target database. The default for "*.csv" files is CSV. For other two part names the default ist PostgreSQL. PG. |
Check_Constraints | Bulk load option. Only for MSSQL. Check all constraints on the target table. |
Fire_Triggers | Bulk load option. Only for MSSQL. Execute insert triggers. |
Keep_Nulls | Bulk load option. Only for MSSQL. Retain null value instead of setting default values. |
kb | Bulk load option. Only for MSSQL. Number of kilobytes of data per batch. |
Rows | Bulk load option. Only for MSSQL. Number of rows per batch. |
Order | Bulk load option. Only for MSSQL. Comma-separated list of how data is sorted. |
Tablock | Bulk load option. Only for MSSQL. Request a table level lock during bulk insert operation |
name | A name assigned to the node. Used to name the log file, the documentation, and the UI. A default name is provided if not specified |
descr | A quoted string describing the node. This is put in the documentation. |
where | A quoted string containing a WHERE clause to apply to the input. The WHERE clause needs to evaluate to a Boolean. |
exec_when | A comma separated list of flags. The flags indicate whether the node produces output. |
select | Only for PostgreSQL and MS SQL. This is a valid select statement pass to the database to select specific records for input. |
The defaults for the bulk load options on MS SQL are whatever the defaults are for the MS SQL instance. Where clauses can use all the functions and syntax that is available to the Eval() function (See Evaluating Run Time Expressions in SQL
If an output node has set one or more exec_when= flags, the
output node will only
output to the destination if the corresponding flag is set in
the joboptions.json
file, or in the options tab on the run studies screen,
or using the --exec-when command line option. There is currently some
differences on how spaces are handled in options so:
- exec_when - in a stream file
- --exec-when - on the command line
- execwhen - in joboptions.json.
Exec when allows for output nodes that only operate under certain conditions. For example for debugging or testing. For nodes that output to a database, the database doesn't have to exist if the execution of the output node is disabled. That is, no error will be produced from the node.
Examples of valid output node declarations are :
out answers -> ans.csv
output answers -> ans // Don't need the .csv extension
out mine -> mine.csv compress=true name=MineCompressed desc="Compressed output is smaller but slower to write"
out yours -> reporting.final db=pg
out mine -> datalake.reports.mine db=mssql {keep_nulls=true check_constraints=true}
out output_join1 -> join.csv where="upper(id) != 'CLAIRE'"
out debugQueue -> debug.csv exec_when = debug
The last output node will create debug.csv when the following is set in the system options in joboptions.json:
{
"system": {
"output": {
"usedate": false,
"execwhen":{"audit":false, "debug":true}
}
}
}
S3 Input and Output¶
S3 Input Node¶
To read input directly from S3, you can use the following syntax for an input node:
in | input <s3Path> -> <queue>
[name=<node name>]
[compress=true|false]
[descr="<description of node>"]
[where="<where clause>"]
where
Option | Description |
---|---|
s3Path | The S3 path where the output will be written, formatted as s3://<bucket-name>/<object/key/name> . |
name | A name assigned to the node. Used to name the log file, the documentation, and the UI. A default name is provided if not specified. |
compress | Indicate whether the CSV input is compressed or not. The default is false. |
descr | A quoted string describing the node. This is put in the documentation. |
where | A quoted string containing a WHERE clause to apply to the input. The WHERE clause needs to evaluate to a Boolean. |
Here are some examples of valid S3 input node declarations:
in s3://some-bucket/some-input-file.csv -> input name=MyS3Input descr="Input from S3"
input s3://my-bucket/scenario-input.csv -> input name=ScenarioInput descr="Scenario input from S3"
in s3://myBucket/myObjectKey.csv.gz -> input name=GZippedFile descr="Compressed input from S3"
in s3://logs-bucket/error-logs compress -> input name=CompressedErrorLogs descr="Compressed logs from S3"
in s3://project-directory/folder/subfolder/file.csv -> input name=NestedPathInput descr="Input with nested path from S3"
in s3://vor-stream-bucket/vor-stream-input.csv -> input where="value<100" name=S3InputFiltered descr="Filtered input from S3"
S3 Output Node¶
To write output directly to S3, you can use the following syntax for an output node:
out | output <queue> -> <s3Path>
[name=<node name>]
[descr="<description of node>"]
[compress=true|false]
[mode=append|replace]
[where="<where clause>"]
[exec_when= comma separated list of flags]
where
Option | Description |
---|---|
s3Path | The S3 path where the output will be written, formatted as s3://<bucket-name>/<object/key/name> . |
name | A name assigned to the node. Used to name the log file, the documentation, and the UI. A default name is provided if not specified. |
descr | A quoted string describing the node. This is put in the documentation. |
compress | Indicate whether to compress the CSV output or not. The default is false. |
mode | Indicate the write mode. Accept only either replace (default) or append . |
where | A quoted string containing a WHERE clause to apply to the input. The WHERE clause needs to evaluate to a Boolean. |
exec_when | A comma separated list of flags. The flags indicate whether the node produces output. |
Here are some examples of valid S3 output node declarations:
out output -> s3://my-bucket/my-output name=MyS3Output descr="Output to S3"
output output -> s3://someBucket/someObjectKey name=S3Output descr="Output to S3"
out output -> s3://output-bucket/output.csv name=CSVS3Output descr="Output CSV to S3"
out output -> s3://logs/errors.csv.gz compress name=CompressedErrorLogs descr="Output compressed error logs to S3"
out output -> s3://output-data/some/nested/path/output.csv name=NestedPathOutput descr="Writing data with nested path to S3"
out output -> s3://config-bucket/latest-config mode=replace name=ReplacedS3Output descr="Replacing data in S3"
out output -> s3://data-bucket/combined-output-data mode=append name=AppendedS3Output descr="Appending data to existing CSV in S3"
out output -> s3://data-bucket/compressed-output-data compress mode=append name=AppendedGZippedS3Output descr="Appending data to existing compressed file in S3"
out output -> s3://output-data/filtered-data where="value<100" name=S3FilteredOutput descr="Filtered output to S3"
out output -> s3://debug-bucket/debug-output exec_when=debug name=S3DebugOutput descr="Output to S3 for Debug"
Write Modes: Append/Replace¶
Users can control how data is written to S3 using the mode
option:
replace
(default): Overwrites the existing file (if exists).append
: Appends data to an existing file, creating a new one if it does not exist.
Variable substitution¶
Users can use variable substitution when defining a S3 node in stream file. This feature can be applied to both S3 input and output nodes.
The following example demonstrates how variable substitution works with S3 nodes:
in s3://myBucket/playpen_${var.playpen_name}/input${var.s3_input_file} -> input
node usernode(input)(output) setdyn=run_name
out output -> s3://myBucket/playpen_${var.playpen_name}/run${dyn.run_name}/myOutput.csv getdyn=run_name
This offers flexibility to users, enabling users to specify dynamic S3 paths without the need to edit stream file and recreating processes.
For more details on variable substitution, please refer to the section Dynamic Substitution.
PostgreSQL Input and Output¶
The following is a mapping of the VOR Stream data types to the PostgreSQL data type:
VOR Stream | PostgreSQL | Notes |
---|---|---|
num | double precision | Supports NaN, infinity, - infinity |
char | varchar, text | Truncation will occur if the varchar length is not long enough |
bool | boolean | n/a |
array | double precision array | n/a |
date | date | n/a |
datetime | timestamp, time | UTC is assumed if no time zone specified |
int | bigint | Suitable for sequences and keys |
The mappings for MSSQL are the same except for array types, which aren't supported by MSSQL.
For output tables, the table must already exist so values may be appended to the table. For input tables, there is an optional SELECT that when executed, input values can be obtained. The user can provide any valid SELECT statement for the PostgreSQL/MSSQL database, including nested joins.
SAS dataset input¶
The following is a mapping of the VOR Stream data types to the SAS dataset type:
VOR Stream | SAS | Notes |
---|---|---|
num | number | Supports missing values |
char | character | Fixed length SAS strings blank padding removed |
bool | number | 0 = false, True otherwise |
array | SAS array | The smaller of the two array sizes is used |
date | number | Dates in SAS should have a format of DATE or MMDDYYY |
datetime | number | Datetimes in SAS should have a format of DATETIME |
int | number | Truncated to a 64 bit integer value |
Using Substitution Variables for input CSV files¶
Input files used by a process can be modified without recreating/changing the process. This functionality can be used for input nodes that are using CSV files. Instead of writing something like this in a stream file:
in portfolio.csv -> portfolio
in subdir/rules.csv -> rules
a user can write
in var.port -> portfolio
in var.rules -> rules
The user can either define these variables in the joboptions.json
file:
{
"system": {
"scenario":"testscen",
"var":{"input":"/home/derdman/playpen/input/input.csv", "port":"portfolio.csv", "rules":"input/rules.csv"}
}
}
and/or add an upload type corresponding to the substitution name. For example,
add an upload
type called port
and then
upload a portfolio using that type and associate it with the study.
Done this way, the user can use the default portfolio portfolio.csv
defined
in the joboptions.json
file or add a portfolio file to their study.
Substitutions in the joboptions.json
file can either resolve to files relative
to the input
directory or use absolute paths.