Examples

You can find all examples in the GitHub repository in the Examples folder.

Simple usage

This is a simple example of how data can be produced, transferred and consumed in Databay. It uses built-in HttpInlet for producing data using a test URL and MongoOutlet consuming it using MongoDB.

  1. Create an inlet for data production:

http_inlet = HttpInlet('https://jsonplaceholder.typicode.com/todos/1')
  1. Create an outlet for data consumption:

mongo_outlet = MongoOutlet(database_name='databay',
  1. Add the two to a link that will handle data transfer between them:

link = Link(http_inlet, mongo_outlet,
            datetime.timedelta(seconds=5), tags='http_to_mongo')
  1. Create a planner, add the link and start scheduling:

planner = ApsPlanner(link)
planner.start()
  1. (Optional) In this example the databay logger is configured to display all messages. See Logging for more information.

logging.getLogger('databay').setLevel(logging.DEBUG)

Output:

>>> 2020-07-30 19:51:36.313|I| Added link: Link(tags:['http_to_mongo'], inlets:[HttpInlet(metadata:{})], outlets:[MongoOutlet()], interval:0:00:05) (databay.BasePlanner)
>>> 2020-07-30 19:51:36.314|I| Starting ApsPlanner(threads:30) (databay.BasePlanner)

>>> 2020-07-30 19:51:41.318|D| http_to_mongo.0 transfer (databay.Link)
>>> 2020-07-30 19:51:41.318|I| http_to_mongo.0 pulling https://jsonplaceholder.typicode.com/todos/1 (databay.HttpInlet)
>>> 2020-07-30 19:51:42.182|I| http_to_mongo.0 received https://jsonplaceholder.typicode.com/todos/1 (databay.HttpInlet)
>>> 2020-07-30 19:51:42.188|I| http_to_mongo.0 insert [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}] (databay.MongoOutlet)
>>> 2020-07-30 19:51:42.191|I| http_to_mongo.0 written [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False, '_id': ObjectId('5f22c25ea7aca516ec3fcf38')}] (databay.MongoOutlet)
>>> 2020-07-30 19:51:42.191|D| http_to_mongo.0 done (databay.Link)

>>> 2020-07-30 19:51:46.318|D| http_to_mongo.1 transfer (databay.Link)
>>> 2020-07-30 19:51:46.318|I| http_to_mongo.1 pulling https://jsonplaceholder.typicode.com/todos/1 (databay.HttpInlet)
>>> 2020-07-30 19:51:46.358|I| http_to_mongo.1 received https://jsonplaceholder.typicode.com/todos/1 (databay.HttpInlet)
>>> 2020-07-30 19:51:46.360|I| http_to_mongo.1 insert [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}] (databay.MongoOutlet)
>>> 2020-07-30 19:51:46.361|I| http_to_mongo.1 written [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False, '_id': ObjectId('5f22c262a7aca516ec3fcf39')}] (databay.MongoOutlet)
>>> 2020-07-30 19:51:46.362|D| http_to_mongo.1 done (databay.Link)
...

Above log can be read as follows:

  • At first the planner adds the link provided and starts scheduling:

    Added link: Link(tags:['http_to_mongo'], inlets:[HttpInlet(metadata:{})], outlets:[MongoOutlet()], interval:0:00:05)
    Starting ApsPlanner(threads:30)
    
  • Once scheduling starts, link will log the beginning and end of each transfer:

    http_to_mongo.0 transfer
    

    Note the http_to_mongo.0 prefix in the message. It is the string representation of the Update object that represents each individual transfer executed by that particular link. http_to_mongo is the tag of the link, while 0 represents the incremental index of the transfer.

  • Then HttpInlet logs its data production:

    http_to_mongo.0 pulling https://jsonplaceholder.typicode.com/todos/1
    http_to_mongo.0 received https://jsonplaceholder.typicode.com/todos/1
    
  • Followed by MongoOutlet logging its data consumption:

    http_to_mongo.0 insert [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}]
    http_to_mongo.0 written [{'userId': 1, 'id': 1, 'title': 'delectus aut
    
  • Finally, link reports completing its first transfer:

    http_to_mongo.0 done
    

Full example:

import datetime
import logging

from databay import Link
from databay.inlets import HttpInlet
from databay.outlets import MongoOutlet
from databay.planners import ApsPlanner

logging.getLogger('databay').setLevel(logging.DEBUG)

# Create an inlet, outlet and a link.
http_inlet = HttpInlet('https://jsonplaceholder.typicode.com/todos/1')
mongo_outlet = MongoOutlet(database_name='databay',
                           collection='test_collection')
link = Link(http_inlet, mongo_outlet,
            datetime.timedelta(seconds=5), tags='http_to_mongo')

# Create a planner, add the link and start scheduling.
planner = ApsPlanner(link)
planner.start()

Basic Inlet

In this example we create a simple implementation of Inlet, producing a random integer on a 5 second interval.

  1. Extend the Inlet class, returning produced data from the pull method:

class RandomIntInlet(Inlet):

    def pull(self, update):
        return random.randint(0, 100)
  1. Instantiate it:

random_int_inlet = RandomIntInlet()
  1. Add it to a link:

link = Link(random_int_inlet,
            print_outlet,
            interval=timedelta(seconds=5),
            tags='random_ints')
  1. Add to a planner and schedule.

planner = SchedulePlanner(link)
planner.start()

Output:

>>> random_ints.0 50
>>> random_ints.1 61
>>> random_ints.2 5
>>> ...

On each transfer RandomIntInlet produces a random integer.

Full example:

from databay import Link
from databay.outlets import PrintOutlet
from databay.planners import SchedulePlanner
from datetime import timedelta
from databay import Inlet
import random


class RandomIntInlet(Inlet):

    def pull(self, update):
        return random.randint(0, 100)


random_int_inlet = RandomIntInlet()

print_outlet = PrintOutlet(only_payload=True)

link = Link(random_int_inlet,
            print_outlet,
            interval=timedelta(seconds=5),
            tags='random_ints')

planner = SchedulePlanner(link)
planner.start()

Basic Outlet

In this example we create a simple implementation of Outlet, printing the incoming records one by one.

  1. Extend the Outlet class, printing the incoming data in the push method:

class PrintOutlet(Outlet):

    def push(self, records: [Record], update):
        for record in records:
            print(update, record.payload)
  1. Instantiate it:

print_outlet = PrintOutlet()
  1. Add it to a link:

link = Link(random_int_inlet,
            print_outlet,
            interval=timedelta(seconds=2),
            tags='print_outlet')
  1. Add to a planner and schedule.

planner = SchedulePlanner(link)
planner.start()

Output:

>>> print_outlet.0 10
>>> print_outlet.1 34
>>> print_outlet.2 18
>>> ...

On each transfer PrintOutlet prints the payload of records generated by RandomIntInlet

Full example:

from datetime import timedelta

from databay import Link
from databay.inlets import RandomIntInlet
from databay.planners import SchedulePlanner
from databay.record import Record
from databay.outlet import Outlet


class PrintOutlet(Outlet):

    def push(self, records: [Record], update):
        for record in records:
            print(update, record.payload)


random_int_inlet = RandomIntInlet()
print_outlet = PrintOutlet()

link = Link(random_int_inlet,
            print_outlet,
            interval=timedelta(seconds=2),
            tags='print_outlet')

planner = SchedulePlanner(link)
planner.start()

Intermediate Inlet

This example demonstrates an inlet that produces weather prognostic using OpenWeatherMap. It showcases what a realistic implementation of Inlet may look like.

  1. Create the WeatherInlet implementing Inlet class. We expect api_key and city_name to be provided when constructing this inlet.

from databay.inlet import Inlet
import urllib.request


class WeatherInlet(Inlet):
    def __init__(self, api_key: str, city_name: str, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.api_key = api_key
        self.city_name = city_name
  1. Implement pull method, starting by creating the OpenWeatherMap URL using the api_key and city_name provided.

    def pull(self, update) -> List[Record]:
        url = f'https://api.openweathermap.org/data/2.5/weather?' \
              f'q={self.city_name}&' \
              f'appid={self.api_key}'
  1. Make a request to OpenWeatherMap using urllib.request.

        contents = urllib.request.urlopen(url).read().decode('utf8')
  1. Parse the response and return produced data.

        formatted = json.loads(contents)
        return formatted['weather'][0]['description']
  1. Instantiate WeatherInlet.

api_key = os.environ.get('OPEN_WEATHER_MAP_API_KEY')
weather_inlet = WeatherInlet(api_key, 'Bangkok')
  1. Create a link, add it to planner and schedule.

link = Link(weather_inlet, PrintOutlet(only_payload=True),
            interval=timedelta(seconds=2), tags='bangkok_weather')

planner = ApsPlanner(link)
planner.start()

Output:

>>> bangkok_weather.0 light rain
>>> bangkok_weather.1 light rain
>>> bangkok_weather.2 light rain
>>> ...

On each transfer WeatherInlet makes a request to OpenWeatherMap API and returns a description of the weather in the selected city.

Full example:

import json
import os
from datetime import timedelta
from typing import List

from databay import Record, Link
from databay.outlets import PrintOutlet
from databay.planners import ApsPlanner

from databay.inlet import Inlet
import urllib.request


class WeatherInlet(Inlet):
    def __init__(self, api_key: str, city_name: str, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.api_key = api_key
        self.city_name = city_name

    def pull(self, update) -> List[Record]:
        url = f'https://api.openweathermap.org/data/2.5/weather?' \
              f'q={self.city_name}&' \
              f'appid={self.api_key}'

        contents = urllib.request.urlopen(url).read().decode('utf8')

        formatted = json.loads(contents)
        return formatted['weather'][0]['description']


api_key = os.environ.get('OPEN_WEATHER_MAP_API_KEY')
weather_inlet = WeatherInlet(api_key, 'Bangkok')

link = Link(weather_inlet, PrintOutlet(only_payload=True),
            interval=timedelta(seconds=2), tags='bangkok_weather')

planner = ApsPlanner(link)
planner.start()

Intermediate Outlet

This example demonstrates an outlet that writes the incoming records into a file. It showcases what a realistic implementation of Outlet may look like.

  1. Create the FileOutlet implementing Outlet class. This outlet will accept two metadata keys:

    • FileOutlet.FILEPATH - specifying the file that the record should be written into.

    • FileOutlet.FILE_MODE - specifying the write mode using Python’s default IO.

class FileOutlet(Outlet):

    FILEPATH = 'FileOutlet.FILEPATH'
    """Filepath of the file to write to."""

    FILE_MODE = 'FileOutlet.FILE_MODE'
    """Write mode to use when writing into the csv file."""
  1. We give an option to specify default_filepath and default_file_mode when constructing this outlet.

    def __init__(self,
                 default_filepath: str = 'outputs/default_output.txt',
                 default_file_mode: str = 'a'):

        super().__init__()

        self.default_filepath = default_filepath
        self.default_file_mode = default_file_mode
  1. Implement push method, looping over all records and reading their metadata.

    def push(self, records: [Record], update):
        for record in records:
            filepath = record.metadata.get(
                self.FILEPATH, self.default_filepath)
            file_mode = record.metadata.get(
                self.FILE_MODE, self.default_file_mode)
  1. Write the record according to the filepath and file_mode found.

            with open(filepath, file_mode) as f:
                f.write(str(record.payload)+'\n')
  1. Instantiate FileOutlet and RandomIntInlet provided with a metadata dictionary.

  1. Create a link, add to a planner and schedule.

link = Link(random_int_inlet,
            file_outlet,
            interval=timedelta(seconds=2),
            tags='file_outlet')

planner = ApsPlanner(link)
planner.start()

Creates outputs/random_ints.txt file:

1
76
52
76
64
89
71
12
70
74
...

Full example:

from datetime import timedelta

from databay import Link
from databay.inlets import RandomIntInlet
from databay.planners import ApsPlanner
from databay.record import Record
from databay.outlet import Outlet


class FileOutlet(Outlet):

    FILEPATH = 'FileOutlet.FILEPATH'
    """Filepath of the file to write to."""

    FILE_MODE = 'FileOutlet.FILE_MODE'
    """Write mode to use when writing into the csv file."""

    def __init__(self,
                 default_filepath: str = 'outputs/default_output.txt',
                 default_file_mode: str = 'a'):

        super().__init__()

        self.default_filepath = default_filepath
        self.default_file_mode = default_file_mode

    def push(self, records: [Record], update):
        for record in records:
            filepath = record.metadata.get(
                self.FILEPATH, self.default_filepath)
            file_mode = record.metadata.get(
                self.FILE_MODE, self.default_file_mode)

            with open(filepath, file_mode) as f:
                f.write(str(record.payload)+'\n')


metadata = {
    FileOutlet.FILEPATH: 'outputs/random_ints.txt',
    FileOutlet.FILE_MODE: 'a'
}
random_int_inlet = RandomIntInlet(metadata=metadata)
file_outlet = FileOutlet()

link = Link(random_int_inlet,
            file_outlet,
            interval=timedelta(seconds=2),
            tags='file_outlet')

planner = ApsPlanner(link)
planner.start()

Basic metadata

This example demonstrates basic usage of Global metadata, used by a PrintOutlet created in the Basic Outlet example.

  1. Create the ConditionalPrintOutlet implementing Outlet class. This outlet will accept one metadata key:

    • ConditionalPrintOutlet.SHOULD_PRINT - whether record should be printed.

class ConditionalPrintOutlet(Outlet):

    SHOULD_PRINT = 'ConditionalPrintOutlet.SHOULD_PRINT'
    """Whether records should be printed or skipped."""
  1. Implement push method, looping over all records and printing them if ConditionalPrintOutlet.SHOULD_PRINT is set:

    def push(self, records: [Record], update):
        for record in records:
            if record.metadata.get(self.SHOULD_PRINT):
                print(update, record)
  1. Instantiate two inlets, one that always prints, other that never prints:

random_int_inlet_on = RandomIntInlet(
    metadata={ConditionalPrintOutlet.SHOULD_PRINT: True})
random_int_inlet_off = RandomIntInlet(
    metadata={ConditionalPrintOutlet.SHOULD_PRINT: False})
  1. Instantiate ConditionalPrintOutlet and add all nodes to a link

print_outlet = ConditionalPrintOutlet()

link = Link([random_int_inlet_on, random_int_inlet_off],
            print_outlet,
            interval=timedelta(seconds=0.5),
            tags='should_print_metadata')
  1. Add to a planner and schedule.

planner = SchedulePlanner(link, refresh_interval=0.5)
planner.start()

Output:

>>> should_print_metadata.0 Record(payload=44, metadata={'PrintOutlet.SHOULD_PRINT': True, '__inlet__': "RandomIntInlet(metadata:{'PrintOutlet.SHOULD_PRINT': True})"})
>>> should_print_metadata.1 Record(payload=14, metadata={'PrintOutlet.SHOULD_PRINT': True, '__inlet__': "RandomIntInlet(metadata:{'PrintOutlet.SHOULD_PRINT': True})"})
>>> should_print_metadata.2 Record(payload=54, metadata={'PrintOutlet.SHOULD_PRINT': True, '__inlet__': "RandomIntInlet(metadata:{'PrintOutlet.SHOULD_PRINT': True})"})
>>> ...

On each transfer ConditionalPrintOutlet prints records incoming only from the random_int_inlet_on that was constructed with global metadata that allows printing.

Full example:

from datetime import timedelta

from databay import Link
from databay.inlets import RandomIntInlet
from databay.outlet import Outlet
from databay.planners import SchedulePlanner
from databay.record import Record


class ConditionalPrintOutlet(Outlet):

    SHOULD_PRINT = 'ConditionalPrintOutlet.SHOULD_PRINT'
    """Whether records should be printed or skipped."""

    def push(self, records: [Record], update):
        for record in records:
            if record.metadata.get(self.SHOULD_PRINT):
                print(update, record)


random_int_inlet_on = RandomIntInlet(
    metadata={ConditionalPrintOutlet.SHOULD_PRINT: True})
random_int_inlet_off = RandomIntInlet(
    metadata={ConditionalPrintOutlet.SHOULD_PRINT: False})

print_outlet = ConditionalPrintOutlet()

link = Link([random_int_inlet_on, random_int_inlet_off],
            print_outlet,
            interval=timedelta(seconds=0.5),
            tags='should_print_metadata')

planner = SchedulePlanner(link, refresh_interval=0.5)
planner.start()

Basic asynchronous

This tutorial showcases a simple usage of asynchronous inlets and outlets.

  1. Create an asynchronous inlet.

class RandomIntInlet(Inlet):

    async def pull(self, update):

        # simulate a long-taking operation
        await asyncio.sleep(0.5)

        # execute
        r = random.randint(0, 100)

        _LOGGER.debug(f'{update} produced:{r}')
        return r
  1. Create an asynchronous outlet. Note that one asynchronous wait will be simulated for each record consumed.

class PrintOutlet(Outlet):

    async def push(self, records: [Record], update):
        _LOGGER.debug(f'{update} push starts')

        # create an asynchronous task for each record
        tasks = [self.print_task(record, update) for record in records]

        # await all print tasks
        await asyncio.gather(*tasks)

    async def print_task(self, record, update):

        # simulate a long-taking operation
        await asyncio.sleep(0.5)

        # execute
        _LOGGER.debug(f'{update} consumed:{record.payload}')
  1. Instantiate three asynchronous inlets and one asynchronous outlet.

random_int_inletA = RandomIntInlet()
random_int_inletB = RandomIntInlet()
random_int_inletC = RandomIntInlet()
print_outlet = PrintOutlet()

link = Link([random_int_inletA, random_int_inletB, random_int_inletC],
            print_outlet,
            interval=timedelta(seconds=2),
            tags='async')
  1. Add to a planner and schedule.

planner = SchedulePlanner(link)
planner.start()

Output:

>>> 2020-08-04 22:40:41.242|D| async.0 transfer
>>> 2020-08-04 22:40:41.754|D| async.0 produced:20
>>> 2020-08-04 22:40:41.754|D| async.0 produced:55
>>> 2020-08-04 22:40:41.754|D| async.0 produced:22
>>> 2020-08-04 22:40:41.755|D| async.0 push starts
>>> 2020-08-04 22:40:42.267|D| async.0 consumed:20
>>> 2020-08-04 22:40:42.267|D| async.0 consumed:55
>>> 2020-08-04 22:40:42.267|D| async.0 consumed:22
>>> 2020-08-04 22:40:42.267|D| async.0 done

>>> 2020-08-04 22:40:43.263|D| async.1 transfer
>>> 2020-08-04 22:40:43.776|D| async.1 produced:10
>>> 2020-08-04 22:40:43.776|D| async.1 produced:4
>>> 2020-08-04 22:40:43.776|D| async.1 produced:90
>>> 2020-08-04 22:40:43.777|D| async.1 push starts
>>> 2020-08-04 22:40:44.292|D| async.1 consumed:10
>>> 2020-08-04 22:40:44.292|D| async.1 consumed:4
>>> 2020-08-04 22:40:44.292|D| async.1 consumed:90
>>> 2020-08-04 22:40:44.292|D| async.1 done

On each transfer, two asynchronous operations take place:

  • First, all inlets are simultaneously awaiting before producing their data.

  • Once all data from inlets is gathered, the second stage commences where the outlet simultaneously awaits for each record before printing it out.

This simulates a delay happening either in the inlets or outlets. Note how one transfer takes approximately a second to complete, despite executing six operations each requiring 0.5 seconds of sleep. If this was to execute synchronously, the entire transfer would take around 3 seconds to complete.

Full example:

import asyncio
import logging

from databay import Link, Outlet, Record
from databay.planners import SchedulePlanner
from datetime import timedelta
from databay import Inlet
import random

_LOGGER = logging.getLogger('databay.basic_asynchronous')
logging.getLogger('databay').setLevel(logging.DEBUG)


class RandomIntInlet(Inlet):

    async def pull(self, update):

        # simulate a long-taking operation
        await asyncio.sleep(0.5)

        # execute
        r = random.randint(0, 100)

        _LOGGER.debug(f'{update} produced:{r}')
        return r


class PrintOutlet(Outlet):

    async def push(self, records: [Record], update):
        _LOGGER.debug(f'{update} push starts')

        # create an asynchronous task for each record
        tasks = [self.print_task(record, update) for record in records]

        # await all print tasks
        await asyncio.gather(*tasks)

    async def print_task(self, record, update):

        # simulate a long-taking operation
        await asyncio.sleep(0.5)

        # execute
        _LOGGER.debug(f'{update} consumed:{record.payload}')


random_int_inletA = RandomIntInlet()
random_int_inletB = RandomIntInlet()
random_int_inletC = RandomIntInlet()
print_outlet = PrintOutlet()

link = Link([random_int_inletA, random_int_inletB, random_int_inletC],
            print_outlet,
            interval=timedelta(seconds=2),
            tags='async')

planner = SchedulePlanner(link)
planner.start()

Elasticsearch Outlet

In this example we create an implementation of Outlet that indexes records as documents to a running Elasticsearch instance.

Note: this example assumes that Elasticsearch is correctly configured and that the index you are indexing documents to exists with the appropriate mappings. For more details see the official Elasticsearch Python client

  1. Extend the Outlet with new parameters required when constructing: es_client - an instance of the elasticsearch python client and index_name the name of a pre-existing index in the running cluster.

class ElasticsearchIndexerOutlet(Outlet):
    " An example outlet for indexing text documents from any `Inlet`."

    def __init__(self,
                 es_client: elasticsearch.Elasticsearch,
                 index_name: str,
                 overwrite_documents: bool = True):
        super().__init__()
        self.es_client = es_client
        self.index_name = index_name

        # if true existing documents will be overwritten
        # otherwise we will skip indexing and log that document id exists in index.
        self.overwrite_documents = overwrite_documents

        if not self.es_client.indices.exists(self.index_name):
            raise RuntimeError(f"Index '{self.index_name}' does not exist ")
  1. In this implementation of the push method there are a few custom behaviors specified. As we iterate over every incoming record:

    • We use the dict keys from the current record’s payload as our unique document ID.

    • The flag self.overwrite_documents determines whether we will check if an id already exists.

    • If self.overwrite_documents is True we simply index the document and _id without doing any check.

    • Otherwise we use the client to check if _id exists in the index. If it does we skip and log that it already exists. Otherwise it is indexed as normal.

    def push(self, records: List[Record], update):
        for record in records:

            payload = record.payload

            # using dict keys from payload as unique id for index
            for k in payload.keys():
                _id = k
                text = payload[k]
                body = {"my_document": text}
                if self.overwrite_documents:
                    self.es_client.index(
                        self.index_name, body, id=_id)
                    _LOGGER.info(f"Indexed document with id {_id}")

                else:
                    if self.es_client.exists(self.index_name, _id):
                        # log that already exists
                        _LOGGER.info(
                            f"Document already exists for id {_id}. Skipping.")
                    else:
                        self.es_client.index(
                            self.index_name, body, id=_id)
                        _LOGGER.info(f"Indexed document with id {_id}")


  1. This simple Inlet takes a list of strings as its main parameter. In its pull method it randomly selects one and returns the string and an incrementing id as a dict. We’ll use this to pass documents to our Elasticsearch Outlet.

class DummyTextInlet(Inlet):
    "A simple `Inlet` that randomly pulls a string from a list of strings."

    def __init__(self, text: list, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.text = text
        self._id = 0

    def pull(self, update):
        text_selection = random.choice(self.text)
        self._id += 1
        time.sleep(1)
        return {self._id: text_selection}
  1. Instantiate our simple Inlet as well as an instance of ElasticsearchIndexerOutlet with the default parameter for overwrite_documents.

    • We use the official Elasticsearch Python client for es_client.

    • This example assumes my-test-index exists already in our elasticsearch cluster.

es_client = elasticsearch.Elasticsearch(timeout=30)

text_inlet = DummyTextInlet(TEXT.split("."))
elasticsearch_outlet = ElasticsearchIndexerOutlet(
    es_client, "my-test-index")
  1. Tie it all together using Link AND Planner

    • The link is setup to index a new document every 2 seconds.

link = Link(text_inlet,
            elasticsearch_outlet,
            interval=2,
            tags='elasticsearch_outlet')

planner = ApsPlanner(link)
planner.start()

Output:

  • From the logs we can see that the records are being written into our Elasticsearch index.

>>> Indexed document with id 1
>>> Indexed document with id 2
>>> Indexed document with id 3
>>> Indexed document with id 4
>>> Indexed document with id 5
>>> Indexed document with id 6
>>> Indexed document with id 7
>>> Indexed document with id 8

Output (if overwrite_documents is set to False):

  • From the logs we can see that the record ID’s so far have already been written into our Elasticsearch index.

>>> Document already exists for id 1. Skipping.
>>> Document already exists for id 2. Skipping.
>>> Document already exists for id 3. Skipping.
>>> Document already exists for id 4. Skipping.
>>> Document already exists for id 5. Skipping.
>>> Document already exists for id 6. Skipping.
>>> Document already exists for id 7. Skipping.
>>> Document already exists for id 8. Skipping.

Full example:

import logging
import random
import time
from typing import List

import elasticsearch
from databay import Inlet, Link, Outlet
from databay.planners import ApsPlanner
from databay.record import Record

TEXT = """
Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Phasellus ex erat, viverra tincidunt tempus eget, hendrerit sed ligula. 
Quisque mollis nibh in imperdiet porttitor. Nulla bibendum lacus et est lobortis porta.
Nulla sed ligula at odio volutpat consectetur. Sed quis augue ac magna porta imperdiet interdum eu velit. 
Integer pretium ultrices urna, id viverra mauris ultrices ut. Etiam aliquet tellus porta nisl eleifend, non hendrerit nisl sodales. 
Aliquam eget porttitor enim. 
"""

_LOGGER = logging.getLogger('databay.elasticsearch_outlet')


class ElasticsearchIndexerOutlet(Outlet):
    " An example outlet for indexing text documents from any `Inlet`."

    def __init__(self,
                 es_client: elasticsearch.Elasticsearch,
                 index_name: str,
                 overwrite_documents: bool = True):
        super().__init__()
        self.es_client = es_client
        self.index_name = index_name

        # if true existing documents will be overwritten
        # otherwise we will skip indexing and log that document id exists in index.
        self.overwrite_documents = overwrite_documents

        if not self.es_client.indices.exists(self.index_name):
            raise RuntimeError(f"Index '{self.index_name}' does not exist ")

    def push(self, records: List[Record], update):
        for record in records:

            payload = record.payload

            # using dict keys from payload as unique id for index
            for k in payload.keys():
                _id = k
                text = payload[k]
                body = {"my_document": text}
                if self.overwrite_documents:
                    self.es_client.index(
                        self.index_name, body, id=_id)
                    _LOGGER.info(f"Indexed document with id {_id}")

                else:
                    if self.es_client.exists(self.index_name, _id):
                        # log that already exists
                        _LOGGER.info(
                            f"Document already exists for id {_id}. Skipping.")
                    else:
                        self.es_client.index(
                            self.index_name, body, id=_id)
                        _LOGGER.info(f"Indexed document with id {_id}")


class DummyTextInlet(Inlet):
    "A simple `Inlet` that randomly pulls a string from a list of strings."

    def __init__(self, text: list, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.text = text
        self._id = 0

    def pull(self, update):
        text_selection = random.choice(self.text)
        self._id += 1
        time.sleep(1)
        return {self._id: text_selection}


_LOGGER.setLevel(logging.INFO)

es_client = elasticsearch.Elasticsearch(timeout=30)

text_inlet = DummyTextInlet(TEXT.split("."))
elasticsearch_outlet = ElasticsearchIndexerOutlet(
    es_client, "my-test-index")

link = Link(text_inlet,
            elasticsearch_outlet,
            interval=2,
            tags='elasticsearch_outlet')

planner = ApsPlanner(link)
planner.start()

Twitter Inlet

In this example we create an implementation of an Inlet that connects to the Twitter API and either listens for new tweets for a specific user or to the home timeline for an authenticated account.

Note: this example assumes that the Tweepy client is correctly configured and that the Twitter account is registered to use the API. For more details on Tweepy click here.

  1. Extend the Inlet by passing in an instance of the Tweepy client api. Depending on the use case users can also pass in user if they want to run the Inlet on a specific username.

class TwitterInlet(Inlet):
    """
    An implementation of an `Inlet` that uses the Tweepy (https://www.tweepy.org/)
    Twitter client to pull tweets from either a specific users' timeline or the
    home timeline belonging to an authenticated `tweepy.API` instance.
    """

    def __init__(self, api: tweepy.API, user: str = None, most_recent_id=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.api = api
        self.user = user

        # this will ensure we only every pull tweets that haven't been handled
        self.most_recent_id = most_recent_id

        # sets flag indicating whether we are pulling from as single user
        # or from the home timeline.
        if self.user is None:
            self.is_user_timeline = False
        else:
            self.is_user_timeline = True
  1. For the pull method we perform a number of configuration specific checks:

    • If the flag self.is_user_timeline is True we’ll be using the user_timeline method of the Tweepy API. This pulls tweets from a specific users’ timeline rather than the registered accounts’ home timeline.

    • Additionally there is a check in both conditional branches that checks for self.most_recent_id, if a recent ID exists then this ID is passed an additional parameter to Tweepy. This will ensure that only new tweets since the last pull are fetched.

    • self.most_recent_id is assigned by taking the ID from the first tweet in the results list.

    def pull(self, update):
        if self.is_user_timeline:
            if self.most_recent_id is not None:
                public_tweets = self.api.user_timeline(
                    self.user, since_id=self.most_recent_id)
            else:
                public_tweets = self.api.user_timeline(
                    self.user)
        else:
            if self.most_recent_id is not None:
                public_tweets = self.api.home_timeline(
                    since_id=self.most_recent_id)
            else:
                public_tweets = self.api.home_timeline()

        if len(public_tweets) > 0:
            # 0th tweet is most recent
            self.most_recent_id = public_tweets[0].id

        tweets = []
        for tweet in public_tweets:
            tweets.append({"user": tweet.user.screen_name, "text": tweet.text})
        return tweets
  1. To authenticate Tweepy correctly the appropriate keys and secrets must be passed to the API.

auth = tweepy.OAuthHandler(
    consumer_key, consumer_secret)  # user defined values
auth.set_access_token(access_token, access_token_secret)  # user defined values

# extra params here protect against twitter rate limiting
# set link intervals with this in mind
# for more on twitter rate limiting see https://developer.twitter.com/en/docs/rate-limits
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
  1. The TwitterInlet can then be instantiated as seen below. We are using the PrintOutlet to print the results of each successful pull.

    • Note: Be mindful of the interval you pass to Link as the Twitter API has strict rate limiting policies.

# create TwitterUserInlet() pointed at a specific account name
twitter_user_inlet = TwitterInlet(api, "@BarackObama")

link = Link(twitter_user_inlet, PrintOutlet(only_payload=True),
            interval=30, tags='twitter_timeline')

planner = SchedulePlanner(link)
planner.start()

Output:

>>> {'user': 'BarackObama', 'text': 'Georgia’s runoff election will determine whether the American people have a Senate that’s actually fighting for the… https://t.co/igUiRzxNxe'}
>>> {'user': 'BarackObama', 'text': 'Here’s a great way to call voters in Georgia and help them get ready to vote. A couple hours this weekend could hel… https://t.co/x6Nc8w7F38'}
>>> {'user': 'BarackObama', 'text': "Happy Hanukkah to all those celebrating around the world. This year has tested us all, but it's also clarified what… https://t.co/k2lzUQ9LNm"}
>>> {'user': 'BarackObama', 'text': 'In A Promised Land, I talk about the decisions I had to make during the first few years of my presidency. Here are… https://t.co/KbE2FDStYr'}
>>> {'user': 'BarackObama', 'text': "With COVID-19 cases reaching an all-time high this week, we've got to continue to do our part to protect one anothe… https://t.co/Gj0mEFfuLY"}
>>> {'user': 'BarackObama', 'text': 'To all of you in Georgia, today is the last day to register to vote in the upcoming runoff election. Take a few min… https://t.co/Jif3Gd7NpQ'}

Full example:

import os

import tweepy
from databay import Inlet, Link
from databay.outlets import PrintOutlet
from databay.planners import SchedulePlanner


class TwitterInlet(Inlet):
    """
    An implementation of an `Inlet` that uses the Tweepy (https://www.tweepy.org/)
    Twitter client to pull tweets from either a specific users' timeline or the
    home timeline belonging to an authenticated `tweepy.API` instance.
    """

    def __init__(self, api: tweepy.API, user: str = None, most_recent_id=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.api = api
        self.user = user

        # this will ensure we only every pull tweets that haven't been handled
        self.most_recent_id = most_recent_id

        # sets flag indicating whether we are pulling from as single user
        # or from the home timeline.
        if self.user is None:
            self.is_user_timeline = False
        else:
            self.is_user_timeline = True

    def pull(self, update):
        if self.is_user_timeline:
            if self.most_recent_id is not None:
                public_tweets = self.api.user_timeline(
                    self.user, since_id=self.most_recent_id)
            else:
                public_tweets = self.api.user_timeline(
                    self.user)
        else:
            if self.most_recent_id is not None:
                public_tweets = self.api.home_timeline(
                    since_id=self.most_recent_id)
            else:
                public_tweets = self.api.home_timeline()

        if len(public_tweets) > 0:
            # 0th tweet is most recent
            self.most_recent_id = public_tweets[0].id

        tweets = []
        for tweet in public_tweets:
            tweets.append({"user": tweet.user.screen_name, "text": tweet.text})
        return tweets


# gets twitter api secrets and keys from environment vars
consumer_key = os.getenv("twitter_key")
consumer_secret = os.getenv("twitter_secret")
access_token = os.getenv("twitter_access_token")
access_token_secret = os.getenv("twitter_access_token_secret")


auth = tweepy.OAuthHandler(
    consumer_key, consumer_secret)  # user defined values
auth.set_access_token(access_token, access_token_secret)  # user defined values

# extra params here protect against twitter rate limiting
# set link intervals with this in mind
# for more on twitter rate limiting see https://developer.twitter.com/en/docs/rate-limits
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)


# create TwitterUserInlet() pointed at a specific account name
twitter_user_inlet = TwitterInlet(api, "@BarackObama")

link = Link(twitter_user_inlet, PrintOutlet(only_payload=True),
            interval=30, tags='twitter_timeline')

planner = SchedulePlanner(link)
planner.start()