Extending Outlets

To implement custom data consumption you need to extend the Outlet class and override the Outlet.push method.

Simple example

  1. Extend the Outlet class, printing the incoming data in the push method:

class PrintOutlet(Outlet):

    def push(self, records: [Record], update):
        for record in records:
            print(update, record.payload)
  1. Instantiate it:

print_outlet = PrintOutlet()
  1. Add it to a link and and schedule:

link = Link(random_int_inlet,
            print_outlet,
            interval=timedelta(seconds=2),
            tags='print_outlet')

planner = SchedulePlanner(link)
planner.start()

Above setup will print all records transferred by that link (See full example).

Each push call is provided with an Update object as one of parameters. It contains the tags of the governing link (if specified) and an incremental integer index. Use the str(update) to get a formatted string of that update. See Transfer Update for more.

Consuming Records

Outlets are provided with a list of all records produced by all inlets of the governing link. Each Record contains two fields:

  1. Record.payload - data stored in the record.

  2. Record.metadata - metadata attached to the record

from databay import Outlet

class ConditionalPrintOutlet(Outlet):

    def push(self, records, update):
        for record in records:
            if record.metadata.get('should_print', False):
                print(record.payload)

By default a copy of records is provided to outlets in order to prevent accidental data corruption. You can disable this mechanism by passing copy_records=False when constructing a link, in which case the same list will be provided to all outlets. Ensure you aren’t modifying the records or their underlying data in your Outlet.push method.

Metadata

Your outlet can be built to behave differently depending on the metadata carried by the records. Metadata is attached to each record when inlets produce data. Learn more about the difference between Global metadata and Local metadata.

When creating an outlet it is up to you to ensure the expected metadata and its effects are clearly documented. To prevent name clashes between various outlets’ metadata, it is recommended to include outlet name in the metadata keys expected by your outlet.

Incorrect:

CSV_FILE = 'CSV_FILE'

Correct:

CSV_FILE = 'CsvOutlet.CSV_FILE'
class CsvOutlet(Outlet):

    # Name of csv file to write records to.
    CSV_FILE = 'CsvOutlet.CSV_FILE'

    def push(self, records:[Record], update):
        for record in records:
            if self.CSV_FILE in record.metadata:
                csv_file = record.metadata[self.CSV_FILE] + '.csv'

                ...
                # write to csv_file specified

...

random_int_inletA = RandomIntInlet(metadata={CsvOutlet.CSV_FILE: 'cat'})
random_int_inletB = RandomIntInlet(metadata={CsvOutlet.CSV_FILE: 'dog'})
../_images/databay_metadata_csv.png

For clarity and readability, Databay provides the MetadataKey type for specifying metadata key class attributes.

from databay.outlet import MetadataKey

class CsvOutlet(Outlet):
    CSV_FILE:MetadataKey = 'CsvOutlet.CSV_FILE'

Start and shutdown

All outlets contain Outlet.active flag that is set by the governing link when scheduling starts and unset when scheduling stops. You can use this flag to refine the behaviour of your outlet.

You can further control the starting and shutting down functionality by overriding the Outlet.on_start and Outlet.on_shutdown methods. If one Outlet instance is governed by multiple links, these callbacks will be called only once per instance by whichever link executes first.

class PrintOutlet(Outlet):

    def push(self, records, update):
        print(f'{self.prefix} - {records}')

    def on_start(self):
        self.prefix = 'foo'

Asynchronous outlet

You may implement asynchronous data consumption by defining Outlet.push as a coroutine.

import asyncio
from databay import Outlet

class AsyncOutlet(Outlet):

    # Note the 'async' keyword
    async def push(self, records, update):
        async_results = await some_async_code(records)
        await asyncio.sleep(1)

See Basic Asynchronous for a full example of implementing asynchronous code in Databay.


Next Steps

  1. Learn about extending Inlets.

  2. See the Examples