Skip to content

Grouping Keys

Introduction

Parallel processing introduces a challenge when order of observations matters. An important example is Scenario Analysis. In scenario analysis, a single instrument is pulled from the input queue and evaluated for a set of scenarios. Each scenario spans multiple time points. It is natural to break up the computation into stages and separate the models (probability of default, prepayment model...) which change, from the code that uses these models, which is static (cashflow generation). These two stages should be put into separate nodes connected by a queue. If parallel processing is used to speed up the model computation, the output of model's node will not guarantee the order of the output, so cashflow computation becomes challenging at best. Ideally, the cashflow node should get a single instrument evaluated for a single scenario and all the time points in the scenario should be in order. Grouping output by a key allows for the following:

  1. Make groups of observations stay together for the next node to process
  2. Have the concept of Group Keys that are used to bind the observations together
  3. An indicator function that informs the receiving node that a new group has started. firstGroupKey()
  4. An indicator function that informs the receiving node that the current observation is the last in the group. lastGroupKey()
  5. A rewind function, that allows the node to rewind the observations to the first observation in the group and pass through the data again. rewindGroup()
  6. Sorting within a group.

Using Grouping Keys

The grouping mechanism leverages buffering that is present in VOR Stream. Messages that are passed on a queue are not single observations but 50 (by default) observations grouped together. All 50 of those observations come from a single thread in the producing node. This buffering is a grouping with arbitrary boundaries. Since all these observations come as a memory blob, passing through the observations multiple times is efficient and doesn't interfere with the streaming constructs.

Queues can be optionally assigned a Group Key by the user in the tables.csv file using the optional GroupKey field. Group Keys are not inherited and there can be at most two Group Keys per table. One or two Group Keys are specified as a space separated list. A Group Key is the name of a field in the referenced table.

Here is an example of a tables.csv file that uses the grouping keys value and date:

name,type,descr,inherit,groupkey
groupkey,,Input table,,value date
id 
class
value
date
groupkeyout,,Output table,groupkey

The limit of the size of a group is 128 Megabytes, that is the total size of the buffer. So a queue with a small number of fields can hold more observations in a group. The ideal size for a group is much smaller than the limit. Groups should be kept between 10 and 1K observations.

Group key functions:

Function Description Golang Python
FirstGroupKey A boolean function that returns true if the current observation is the first in the group and false otherwise. u.FirstGroupKey(input,"key") input.FirstGroupKey("key" )
LastGroupKey A boolean function that returns true if the current observation is the last in the group and false otherwise. u.LastGroupKey(input,"key") input.LastGroupKey( "key" )
RewindGroup Rewinds the group so the next observation send to the worker is the first in the group u.RewindGroup(input,"key") input.RewindGroup("key")
PassGroup Returns the number of times a rewind of the group has occurred. u.PassGroup(input,"key") input.PassGroup("key")
SortGroup Sort the observations in a group by a variable. u.SortGroup(input, "Value", "key") input.SortGroup("date", "key")

All of these functions take the name of the Group Key as an optional argument. If no key argument is specified, then the first Group Key is used.

Golang & Python Examples

Here is example Golang code that performs two different sorts on a group of data, rewinds, and passes through the data only twice. Note that this utilizes a tables.csv file similar to the example in the previous section, in which the input table is called groupkey and the output table is called * groupkeyout*.

package main

import (
 "log"

 "playpen.com/src/queues/Groupkey"
 "playpen.com/src/queues/Groupkeyout"

 "frg.com/streamr/frgutil"
)

// User struct - The user may add to this structure
// There is a copy of this structure for each thread
type User struct {
 hh          *frgutil.Handle                              // required - don't remove
 threadNum   int                                          // required - don't remove
 options     map[string]map[string]map[string]interface{} // required - don't remove
 mem         *frgutil.FuncMem                             // required - don't remove
 Groupkeyout *Groupkeyout.Groupkeyout
}

// this function is called once for each thread
// when the node starts
func (u *User) _init() {

}

// This function is called for each observation on the input queue
// input is a struct with fields the names of the fields in the expected table.
// Capital first letter, lowercase the remaining
func (u *User) worker(input *Groupkey.Groupkey) {

 if u.FirstGroupKey(input) {
  if u.PassGroup(input) < 1 {
   u.SortGroup(input, "date")
  } else {
   u.SortGroup(input, "Value")
  }
 }

 log.Println(input.Date, input.Instid, input.Value)

 Groupkeyout.Post(u.Groupkeyout)
 if u.LastGroupKey(input) {
  log.Println("Last observation in the group")
  if u.PassGroup(input) < 1 {
   u.RewindGroup(input)
  }
 }

}

// this function is called once for each thread
// when the node ends
func (u *User) term() {

}

This is a similar example in Python. Note, that Python doesn't require the input queue as an argument for the grouping functions.

import sys

sys.path.insert(1, '/home/derdman/playpen/src/python/queues')
sys.path.insert(1, '/home/derdman/playpen/src/python')

import math
from frgutil import frgutil
from queues import Groupkey
from queues import Groupkeyout
import logging


class groupkeypy:
    options = dict()

    def __init__(self, handle, Groupkeyout):
        self.Groupkeyout = Groupkeyout

        self.hh = handle
        if 'processoptions' in handle.options['JobOptions']:
            self.options = handle.options['JobOptions']['processoptions']
        else:
            self.options = None
        # perform one time initializations if necessary

    # worker is called for each observation sent on the input queue
    # input is a named tuple
    def worker(self, threadNum, input):

        outputGroupkeyout = dict()

        if input.FirstGroupKey():
            logging.info("First obs in group")
            input.SortGroup("date")

        logging.info(
            str(input.Date) + " " + input.Instid + " " + str(input.Value))

        outputGroupkeyout['Instid'] = input.Instid
        outputGroupkeyout['Date'] = input.Date
        outputGroupkeyout['Value'] = input.Value

        # You don't have to post to and/or all or output queues
        self.Groupkeyout.Post(Groupkeyout.Groupkeyout(**outputGroupkeyout))

        if input.LastGroupKey():
            logging.info("Last observation in the group")
            if input.PassGroup() < 1:
                input.RewindGroup()

    # called when done processing input
    def term(self):
        # Not required to do anything here
        return