Extending Inlets

To implement custom data production you need to extend the Inlet class, override the Inlet.pull method and return the data produced.

Simple example

  1. Extend the Inlet class, returning produced data from the pull method:

class RandomIntInlet(Inlet):

    def pull(self, update):
        return random.randint(0, 100)
  1. Instantiate it:

random_int_inlet = RandomIntInlet()
  1. Add it to a link and start scheduling:

link = Link(random_int_inlet,
            print_outlet,
            interval=timedelta(seconds=5),
            tags='random_ints')

planner = SchedulePlanner(link)
planner.start()

Above setup will produce a random integer every 5 seconds (See full example).

Each pull call is provided with an Update object as a parameter. It contains the tags of the governing link (if specified) and an incremental integer index. Use the str(update) to get a formatted string of that update. See Transfer Update for more.

Your inlet may skip producing data by returning an empty list.

Creating records

Data produced by inlets is wrapped in Record objects before being passed to outlets. If you wish to control how records are created or attach local metadata, use the Inlet.new_record method to create records within your inlet and return these instead.

class RandomIntInlet(Inlet):

    def pull(self, update):
        new_integer = random.randint(0, 100)
        record = self.new_record(payload=new_integer)
        return record

Producing multiple records

During one transfer you may produce multiple data entities within the Inlet.pull method. Returning a list is an indication that multiple records are being produced at once, in which case each element of the list will be turned into a Record. Any return type other than list (eg. tuple, set, dict) will be considered as one Record.

Returning a list, producing two records:

def pull(self, update):

    # produces two records
    return [random.randint(0, 50), random.randint(0, 100)]

Returning a set, producing one record:

def pull(self, update):

    # produces one records
    return {random.randint(0, 50), random.randint(0, 100)}

Same is true when explicitly creating multiple records within pull and returning these.

def pull(self, update):
    first_record = self.new_record(random.randint(0, 50))
    second_record = self.new_record(random.randint(0, 100))

    return [first_record, second_record]

If you wish for one record to contain a list of data that doesn’t get broken down to multiple records, you can either create the record yourself passing the list as payload or return a nested list:

def pull(self, update):
    r1 = random.randint(0, 50)
    r2 = random.randint(0, 100)

    return self.new_record(payload=[r1, r2])

# or
...

def pull(self, update):
    r1 = random.randint(0, 50)
    r2 = random.randint(0, 100)

    return [[r1, r2]]

Global metadata

Inlet can attach custom metadata to all records it produces. Metadata’s intended use is to provide additional context to records when they are consumed by outlets. To do so, when constructing an Inlet pass a metadata dictionary, a copy of which will be attached to all records produced by that Inlet instance.

random_cat_inlet = RandomIntInlet(metadata={'animal': 'cat'})
# produces Record(metadata={'animal': 'cat'})

random_parrot_inlet = RandomIntInlet(metadata={'animal': 'parrot'})
# produces Record(metadata={'animal': 'parrot'})

Metadata dictionary is independent from the inlet that it is given to. Inlet should not modify the metadata or read it; instead inlets should expect all setup parameters to be provided as arguments on construction.

Incorrect:

def MyInlet():
    def __init__(self, metadata):
        self.should_do_stuff = metadata.get('should_do_stuff')

Correct:

def MyInlet():
    def __init__(self, should_do_stuff, *args, **kwargs):
        super().__init__(*args, **kwargs) # metadata dict gets passed and stored here
        self.should_do_stuff = should_do_stuff

Metadata supported by each outlet differs and is dependent on the particular outlet implementation. Please refer to specific outlet documentation for more information on metadata expected.

Additionally, each record is supplied with a special __inlet__ metadata entry containing string representation of the inlet that produced it.

>>> record.metadata['__inlet__']
RandomIntInlet(metadata={})

Local metadata

Apart from providing an inlet with Global metadata that will be the same for all records, you may also attach local per-record metadata that can vary for each record. This can be done inside of the pull method by specifying a metadata dictionary when creating a record using Inlet.new_record method.

class RandomIntInlet(Inlet):

    def pull(self, update):
        new_integer = random.randint(0, 100)

        if new_integer > 50:
            animal = 'cat'
        else:
            animal = 'parrot'

        record = self.new_record(payload=new_integer, metadata={'animal': animal})
        return record

Note that local metadata will override global metadata if same metadata is specified globally and locally.

Start and shutdown

All inlets contain Inlet.active flag that is set by the governing link when scheduling starts and unset when scheduling stops. You can use this flag to refine the behaviour of your inlet.

You can further control the starting and shutting down functionality by overriding the Inlet.on_start and Inlet.on_shutdown methods. If one Inlet instance is governed by multiple links, these callbacks will be called only once per instance by whichever link executes first.

class RandomIntInlet(Inlet):

    def pull(self, update):
        return random.randint(0, 100)

    def on_start(self):
        random.seed(42)

Asynchronous inlet

You may implement asynchronous data production by defining Inlet.pull as a coroutine. The governing link will await all its inlets to finish producing their data before passing the results to outlets.

import asyncio
from databay import Inlet

class AsyncInlet(Inlet):

    # Note the 'async' keyword
    async def pull(self, update):
        async_results = await some_async_code()
        return async_results

See Basic Asynchronous for a full example of implementing asynchronous code in Databay.

You can limit (throttle) how many inlets can execute simultaneously by setting inlet_concurrency parameter when constructing a link.

Test your inlet

Databay comes with a template unittest.TestCase designed to validate your implementation of Inlet class. To use it, create a new test class extending InletTester and implement InletTester.get_inlet method returning an instance of your inlet.

from databay.misc import inlet_tester

class RandomIntInletTest(inlet_tester.InletTester):

    def get_inlet(self):
        return RandomIntInlet()

    ...

    # You can add further tests here

You may also return a list of inlets, to run each test on various configurations of your inlet:

def get_inlet(self):
    return [
        RandomIntInlet(),
        RandomIntInlet(min=10, max=200),
    ]

Running such a concrete test will execute a variety of test cases that ensure your inlet correctly provides the expected functionality. These include:

  • Creating new records.

  • Attaching global and local metadata.

  • Calling pull method.

Since InletTester will call pull on your inlet, you may want to mock some of your inlet’s functionality in order to separate testing of its logic from external code.


Next Steps

  1. Learn about extending Outlets.

  2. See the Examples