Extending Inlets¶
To implement custom data production you need to extend the Inlet
class, override the Inlet.pull
method and return the data produced.
Simple example¶
class RandomIntInlet(Inlet):
def pull(self, update):
return random.randint(0, 100)
Instantiate it:
random_int_inlet = RandomIntInlet()
Add it to a link and start scheduling:
link = Link(random_int_inlet,
print_outlet,
interval=timedelta(seconds=5),
tags='random_ints')
planner = SchedulePlanner(link)
planner.start()
Above setup will produce a random integer every 5 seconds (See full example).
Each pull call is provided with an Update
object as a parameter. It contains the tags of the governing link (if specified) and an incremental integer index. Use the str(update)
to get a formatted string of that update. See Transfer Update for more.
Your inlet may skip producing data by returning an empty list
.
Creating records¶
Data produced by inlets is wrapped in Record
objects before being passed to outlets. If you wish to control how records are created or attach local metadata, use the Inlet.new_record
method to create records within your inlet and return these instead.
class RandomIntInlet(Inlet):
def pull(self, update):
new_integer = random.randint(0, 100)
record = self.new_record(payload=new_integer)
return record
Producing multiple records¶
During one transfer you may produce multiple data entities within the Inlet.pull
method. Returning a list
is an indication that multiple records are being produced at once, in which case each element of the list
will be turned into a Record
. Any return type other than list
(eg. tuple
, set
, dict
) will be considered as one Record
.
Returning a list
, producing two records:
def pull(self, update):
# produces two records
return [random.randint(0, 50), random.randint(0, 100)]
Returning a set
, producing one record:
def pull(self, update):
# produces one records
return {random.randint(0, 50), random.randint(0, 100)}
Same is true when explicitly creating multiple records within pull
and returning these.
def pull(self, update):
first_record = self.new_record(random.randint(0, 50))
second_record = self.new_record(random.randint(0, 100))
return [first_record, second_record]
If you wish for one record to contain a list
of data that doesn’t get broken down to multiple records, you can either create the record yourself passing the list
as payload or return a nested list
:
def pull(self, update):
r1 = random.randint(0, 50)
r2 = random.randint(0, 100)
return self.new_record(payload=[r1, r2])
# or
...
def pull(self, update):
r1 = random.randint(0, 50)
r2 = random.randint(0, 100)
return [[r1, r2]]
Global metadata¶
Inlet
can attach custom metadata to all records it produces. Metadata’s intended use is to provide additional context to records when they are consumed by outlets. To do so, when constructing an Inlet
pass a metadata dictionary, a copy of which will be attached to all records produced by that Inlet
instance.
random_cat_inlet = RandomIntInlet(metadata={'animal': 'cat'})
# produces Record(metadata={'animal': 'cat'})
random_parrot_inlet = RandomIntInlet(metadata={'animal': 'parrot'})
# produces Record(metadata={'animal': 'parrot'})
Metadata dictionary is independent from the inlet that it is given to. Inlet should not modify the metadata or read it; instead inlets should expect all setup parameters to be provided as arguments on construction.
Incorrect:
def MyInlet():
def __init__(self, metadata):
self.should_do_stuff = metadata.get('should_do_stuff')
Correct:
def MyInlet():
def __init__(self, should_do_stuff, *args, **kwargs):
super().__init__(*args, **kwargs) # metadata dict gets passed and stored here
self.should_do_stuff = should_do_stuff
Metadata supported by each outlet differs and is dependent on the particular outlet implementation. Please refer to specific outlet documentation for more information on metadata expected.
Additionally, each record is supplied with a special __inlet__
metadata entry containing string representation of the inlet that produced it.
>>> record.metadata['__inlet__']
RandomIntInlet(metadata={})
Local metadata¶
Apart from providing an inlet with Global metadata that will be the same for all records, you may also attach local per-record metadata that can vary for each record. This can be done inside of the pull
method by specifying a metadata dictionary when creating a record using Inlet.new_record
method.
class RandomIntInlet(Inlet):
def pull(self, update):
new_integer = random.randint(0, 100)
if new_integer > 50:
animal = 'cat'
else:
animal = 'parrot'
record = self.new_record(payload=new_integer, metadata={'animal': animal})
return record
Note that local metadata will override global metadata if same metadata is specified globally and locally.
Start and shutdown¶
All inlets contain Inlet.active
flag that is set by the governing link when scheduling starts and unset when scheduling stops. You can use this flag to refine the behaviour of your inlet.
You can further control the starting and shutting down functionality by overriding the Inlet.on_start
and Inlet.on_shutdown
methods. If one Inlet
instance is governed by multiple links, these callbacks will be called only once per instance by whichever link executes first.
class RandomIntInlet(Inlet):
def pull(self, update):
return random.randint(0, 100)
def on_start(self):
random.seed(42)
Asynchronous inlet¶
You may implement asynchronous data production by defining Inlet.pull
as a coroutine. The governing link will await all its inlets to finish producing their data before passing the results to outlets.
import asyncio
from databay import Inlet
class AsyncInlet(Inlet):
# Note the 'async' keyword
async def pull(self, update):
async_results = await some_async_code()
return async_results
See Basic Asynchronous for a full example of implementing asynchronous code in Databay.
You can limit (throttle) how many inlets can execute simultaneously by setting inlet_concurrency
parameter when constructing a link.
Test your inlet¶
Databay comes with a template unittest.TestCase
designed to validate your implementation of Inlet
class. To use it, create a new test class extending InletTester
and implement InletTester.get_inlet
method returning an instance of your inlet.
from databay.misc import inlet_tester
class RandomIntInletTest(inlet_tester.InletTester):
def get_inlet(self):
return RandomIntInlet()
...
# You can add further tests here
You may also return a list
of inlets, to run each test on various configurations of your inlet:
def get_inlet(self):
return [
RandomIntInlet(),
RandomIntInlet(min=10, max=200),
]
Running such a concrete test will execute a variety of test cases that ensure your inlet correctly provides the expected functionality. These include:
Creating new records.
Attaching global and local metadata.
Calling
pull
method.
Since InletTester
will call pull
on your inlet, you may want to mock some of your inlet’s functionality in order to separate testing of its logic from external code.
Next Steps