Extending Outlets¶
To implement custom data consumption you need to extend the Outlet
class and override the Outlet.push
method.
Simple example¶
class PrintOutlet(Outlet):
def push(self, records: [Record], update):
for record in records:
print(update, record.payload)
Instantiate it:
print_outlet = PrintOutlet()
Add it to a link and and schedule:
link = Link(random_int_inlet,
print_outlet,
interval=timedelta(seconds=2),
tags='print_outlet')
planner = SchedulePlanner(link)
planner.start()
Above setup will print all records transferred by that link (See full example).
Each push call is provided with an Update
object as one of parameters. It contains the tags of the governing link (if specified) and an incremental integer index. Use the str(update)
to get a formatted string of that update. See Transfer Update for more.
Consuming Records¶
Outlets are provided with a list
of all records produced by all inlets of the governing link. Each Record
contains two fields:
Record.payload
- data stored in the record.Record.metadata
- metadata attached to the record
from databay import Outlet
class ConditionalPrintOutlet(Outlet):
def push(self, records, update):
for record in records:
if record.metadata.get('should_print', False):
print(record.payload)
By default a copy of records is provided to outlets in order to prevent accidental data corruption. You can disable this mechanism by passing copy_records=False
when constructing a link, in which case the same list
will be provided to all outlets. Ensure you aren’t modifying the records or their underlying data in your Outlet.push
method.
Metadata¶
Your outlet can be built to behave differently depending on the metadata carried by the records. Metadata is attached to each record when inlets produce data. Learn more about the difference between Global metadata and Local metadata.
When creating an outlet it is up to you to ensure the expected metadata and its effects are clearly documented. To prevent name clashes between various outlets’ metadata, it is recommended to include outlet name in the metadata keys expected by your outlet.
Incorrect:
CSV_FILE = 'CSV_FILE'
Correct:
CSV_FILE = 'CsvOutlet.CSV_FILE'
class CsvOutlet(Outlet):
# Name of csv file to write records to.
CSV_FILE = 'CsvOutlet.CSV_FILE'
def push(self, records:[Record], update):
for record in records:
if self.CSV_FILE in record.metadata:
csv_file = record.metadata[self.CSV_FILE] + '.csv'
...
# write to csv_file specified
...
random_int_inletA = RandomIntInlet(metadata={CsvOutlet.CSV_FILE: 'cat'})
random_int_inletB = RandomIntInlet(metadata={CsvOutlet.CSV_FILE: 'dog'})
For clarity and readability, Databay provides the MetadataKey
type for specifying metadata key class attributes.
from databay.outlet import MetadataKey
class CsvOutlet(Outlet):
CSV_FILE:MetadataKey = 'CsvOutlet.CSV_FILE'
Start and shutdown¶
All outlets contain Outlet.active
flag that is set by the governing link when scheduling starts and unset when scheduling stops. You can use this flag to refine the behaviour of your outlet.
You can further control the starting and shutting down functionality by overriding the Outlet.on_start
and Outlet.on_shutdown
methods. If one Outlet
instance is governed by multiple links, these callbacks will be called only once per instance by whichever link executes first.
class PrintOutlet(Outlet):
def push(self, records, update):
print(f'{self.prefix} - {records}')
def on_start(self):
self.prefix = 'foo'
Asynchronous outlet¶
You may implement asynchronous data consumption by defining Outlet.push
as a coroutine.
import asyncio
from databay import Outlet
class AsyncOutlet(Outlet):
# Note the 'async' keyword
async def push(self, records, update):
async_results = await some_async_code(records)
await asyncio.sleep(1)
See Basic Asynchronous for a full example of implementing asynchronous code in Databay.
Next Steps