Processors

Processors are a middleware pipeline that alters the records transferred from inlets to outlets. Two most common usages of these would be:

  • Filtering - removing some or all records before feeding them to outlets.

  • Transforming - altering the records before feeding them to outlets.

Simple example

# Example filtering
def only_large_numbers(records: List[Records]):
    result = []
    for record in records:
        if record.payload >= 100:
            result.push(record)
    return result

# Example transforming:
def round_to_integers(records: List[Records]):
    for record in records:
        record.payload = round(record.payload)
    return records

# pass to a link
link = Link(..., processors=[only_large_numbers, round_to_integers])

The processor pipeline used in the above example will turn the following list:

[99.999, 200, 333.333]

into:

[200, 333]

Note that 99.999 got filtered out given the order of processors. If we were to swap the processors, the rounding would occur before filtering, allowing all three results through the filter.

Processors explained

A processor is a callable function that accepts a list of records and returns a (potentially altered) list of records.

Processors are called in the order in which they are passed, after all inlets finish producing their data. The result of each processor is given to the next one, until finally the resulting records continue the transfer normally.

Best practices

Responsibility

Databay doesn’t make any further assumptions about processors - you can implement any type of processors that may suit your needs. This also means Databay will not ensure the records aren’t corrupted by the processors, therefore you need to be conscious of what each processor does to the data.

If you wish to verify the integrity of your records after processing, attach an additional processor at the end of your processor pipeline that will validate the correctness of your processed records before sending it off to the outlets.