Buffers

Buffers are special built-in Processors. They allow you to temporarily accumulate records before passing them over to outlets.

Simple example

The following example uses a buffer to store the records until the number of records produced exceed 10 items.

  1. Define a buffer with count_threshold=10:

buffer = Buffer(count_threshold=10)
link = Link(inlet, outlet, processors=buffer)
  1. On first transfer the inlet produces 4 records, the buffer stores them. The outlet receives no records.

  2. On second transfer the inlet produces 4 records, the buffer stores them along with the first 4. The outlet still receives no records.

  3. On third transfer the inlet produces 3 records. Having exceeded the count_threshold of 10, the buffer will release all 11 records to the outlet. The outlet receives a list of 11 records.

Store or release?

When processing records (see Processors) a Buffer will figure out whether records should be stored or released. This is done by passing the list of records to Buffer's internal callable functions called controllers.

Each controller performs different types of checks, returning True or False depending on whether records should be released or stored respectively.

Default controllers

Buffer comes with two default controllers:

  • count_controller - buffering records until reaching a count threshold defined by Buffer.count_threshold parameter, counted from the first time the records are stored. For example:

buffer = Buffer(count_threshold=50) # release records every 50 records.
  • time_controller - buffering records until reaching a time threshold defined by Buffer.time_threshold parameter, counted from the first time the records are stored. For example:

buffer = Buffer(time=60) # release records every 60 seconds.

Custom controllers

Apart from using the default controllers, buffer accepts any number of custom controllers. Each controller will be called with the list of records and is expected to return True or False depending on whether records should be buffered or released. For example:

def big_value_controller(records: List[Records]):
    for record in records:
        if record.payload.value > 10000
            return True

    return False

buffer = Buffer(custom_controllers=big_value_controller)

Buffer reset

Every time the records are released, the buffer will reset the counters of its default controllers and empty the list of records stored.

You can pass a callable as an optional on_reset parameter, which will be invoked every time Buffer.reset is called.

Combining controllers

You can use any combination of default and custom controllers. Buffer allows you to use two types of boolean logic when evaluating whether records should be released:

  • conjunction (AND) - releasing records only when all controllers return True.

  • disjunction (OR) - releasing records as soon as any controller returns True (default).

For example:

buffer = Buffer(count_threshold=10, time_threshold=60, conjugate_controllers=False) # OR

This buffer will release records once 10 records were produced or 60 seconds have elapsed - whichever comes first.

buffer = Buffer(count_threshold=10, time_threshold=60, conjugate_controllers=True) # AND

This buffer will release records once 10 records were produced and 60 seconds have elapsed.

The order of execution of controllers is as follows:

  1. Custom controllers, in order they are passed to the Buffer.

  2. Count controller.

  3. Time controller.

Buffer uses short-circuit logic to stop evaluation of controllers as soon as the decision to release or store is known, therefore not all controllers may be called each time the Buffer is executed.

Once the records are released, the buffer will reset.

Flush

Buffer contains a boolean field called flush, which if set to True will enforce release of records, independently of what the controllers may decide. Such flushing will only take place next time the buffer is called during the upcoming transfer. Flushing will also reset the buffer.

Best practices

One-to-one relationship

Given the internal record storage functionality, one buffer should only be used as either a Link or an Outlet processor - but never both at the same time.

Similarly, one buffer should only be used on one Link or Outlet - never multiple at the same time.

Ensure records are consumed

Note that in several scenarios a buffer may never release its records, therefore they would never be consumed by the outlets. Consider the following examples:

  • Databay crashes before records are released.

  • Planner is stopped before records are released.

  • Thresholds are set to unreachable numbers

Databay does not automatically handle such occasions, however you may preempt these and ensure that records are released manually by combining the buffer’s flush functionality with planners’ force_transfer method.

try:
    # set up Databay
    buffer = Buffer(count_threshold=4000)
    link = Link(inlets, outlets, interval=10)
    planner = SchedulePlanner(link)
    planner.start()

except Exception as e:
    print('Error while running Databay: ' + str(e))

finally:
    buffer.flush = True # ensure the buffer will release data
    link.remove_inlets(link.inlets) # we don't need to produce any more data
    planner.force_transfer() # run one final transfer to flush the data