Queues¶
Introduction¶
Queues are automatically created based on the fields within the tables.csv file. Queues enable inter-process communication by connecting input and output data with reader and writer nodes. Queues connect the different parts of a process, including data and reader/writer nodes. The diagram below is a segment of the UI and shows the structure of a process. The arrows between the sections represent queues.
Queues also produce interface code for the queues and tables, and register the queues to the mid-tier. This interface code is used to marshal and un-marshal data being sent through queues and to read and write data to CSV files on disk. The generated code can be found in the <playpen>/src/queues directory and should not be edited.
Null Queue¶
When a process is run, it must be validated to ensure a reader and writer node
exists for each queue. It may be convenient to have a node that does not read
from any queue or a node, that does not write to a queue, or even both (although
this is rare). The concept of the NULL queue was added to indicate whether the
input or the output queue of a node is intentionally missing. An example use
case would be a node that generates simulation data. This node could be driven
by a matrix map and thus needs no other input. For a node with a NULL queue as
input, the user will not need to execute the worker() function in the node;
therefore, all the code should be in the init() function or the term()
function.
Similarly, a node with a NULL queue output could be used to update dynamic facts or other systems through built-in functions rather than through a queue. Writing to a NULL queue will have no effect. Reading from a NULL queue will return a done signal.
Processing Topology¶
Queues and nodes are connected to determine the order/topology of the process. There are three main ways queues and nodes can be connected: simple, fan-in, and fan-out. Simple connections between nodes and queues look like a straight line, so that each node is connected to the next by a single queue. Fan-in processing combines multiple nodes into one, with the queues from each source node connecting to the same destination. This type of processing could be useful for appending data to another dataset. Finally, fan-out processing copies the contents of one node to multiple nodes, so that each receiving node obtains the same data. Fan-out queues could be used when several nodes need the entirety of its data. fanout is the default type for a queue.
Round Robin Processing¶
Round Robin is less common but useful alternative to fan-out processing. In fan-out processing, all nodes receive data from the input node. In Round Robin, however, data is strategically distributed from the input node to the receiving nodes to prevent excessive capacity overload.
Here is an example of a .strm file that uses Round Robin processing:
name robin
in input.csv -> input
node robin(input)(carbon) lang=python
node robin1(carbon)(output)
node robin2(carbon)(output) lang=python
node robin3(carbon)(output)
node robin4(carbon)(output)
sql select id, class1, class2, lgd from output into official group by id, class1, class2;
predict=predict
out official -> robin.csv
The dictionary.csv and tables.csv file used for the above .strm file contains the following lines. Note that the files are processed using Round Robin by specifying the nofanout type for a table.
name,type,descr,arraylen
id,char
class1,char
class2,char
value,num
x,num
lgd,num
name,type,descr,inherit
input,,Input table
id
class1
class2
value
output,,Output table,input
x
lgd
report,,output table,input
class1 keep
class2 keep
lgd
official,,report input,report
id
carbon,nofanout,carbon table,,input
Adjustments can be made to the user-modifiable *U.go files for each node. For
example, in Golang, the following addition could be made on the robin1U.go file
to the worker() function. This will resolve the Id observations back to their
original values in the output file.
// This function is called for each observation on the input queue
// input is a struct with fields the names of the fields in the expected table.
// Capital first letter, lowercase the remaining
func (u *User) worker(input *Carbon.Carbon) {
// You don't have to post any output queues
u.Output.Id = "robin1"
Output.Post(u.Output)
}
The same can be done in a python user-modifiable file, using the syntax:
def worker(self, threadNum, input):
self.Output.Id = "robin1"
# You don't have to post to and/or all or output queues
self.Output.Post()
Fanout Processing¶
The code for using fanout is almost identical to Round Robin. The only change that needs to be made is the addition of fanout as the type of the intermediate table (carbon in this case). Notice the difference on the last line of the tables.csv file:
name,type,descr,inherit
input,,Input table
id
class1
class2
value
output,,Output table,input
x
lgd
report,,output table,input
class1 keep
class2 keep
lgd
official,,report input,report
id
carbon,fanout,carbon table ,,input
Alternately, the type could be left as blank which implies fanout.


