Examples¶
You can find all examples in the GitHub repository in the Examples folder.
Simple usage¶
This is a simple example of how data can be produced, transferred and consumed in Databay. It uses built-in HttpInlet
for producing data using a test URL and MongoOutlet
consuming it using MongoDB.
Create an inlet for data production:
http_inlet = HttpInlet('https://jsonplaceholder.typicode.com/todos/1')
Create an outlet for data consumption:
mongo_outlet = MongoOutlet(database_name='databay',
Add the two to a link that will handle data transfer between them:
link = Link(http_inlet, mongo_outlet,
datetime.timedelta(seconds=5), tags='http_to_mongo')
Create a planner, add the link and start scheduling:
planner = ApsPlanner(link)
planner.start()
(Optional) In this example the databay logger is configured to display all messages. See Logging for more information.
logging.getLogger('databay').setLevel(logging.DEBUG)
Output:
>>> 2020-07-30 19:51:36.313|I| Added link: Link(tags:['http_to_mongo'], inlets:[HttpInlet(metadata:{})], outlets:[MongoOutlet()], interval:0:00:05) (databay.BasePlanner)
>>> 2020-07-30 19:51:36.314|I| Starting ApsPlanner(threads:30) (databay.BasePlanner)
>>> 2020-07-30 19:51:41.318|D| http_to_mongo.0 transfer (databay.Link)
>>> 2020-07-30 19:51:41.318|I| http_to_mongo.0 pulling https://jsonplaceholder.typicode.com/todos/1 (databay.HttpInlet)
>>> 2020-07-30 19:51:42.182|I| http_to_mongo.0 received https://jsonplaceholder.typicode.com/todos/1 (databay.HttpInlet)
>>> 2020-07-30 19:51:42.188|I| http_to_mongo.0 insert [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}] (databay.MongoOutlet)
>>> 2020-07-30 19:51:42.191|I| http_to_mongo.0 written [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False, '_id': ObjectId('5f22c25ea7aca516ec3fcf38')}] (databay.MongoOutlet)
>>> 2020-07-30 19:51:42.191|D| http_to_mongo.0 done (databay.Link)
>>> 2020-07-30 19:51:46.318|D| http_to_mongo.1 transfer (databay.Link)
>>> 2020-07-30 19:51:46.318|I| http_to_mongo.1 pulling https://jsonplaceholder.typicode.com/todos/1 (databay.HttpInlet)
>>> 2020-07-30 19:51:46.358|I| http_to_mongo.1 received https://jsonplaceholder.typicode.com/todos/1 (databay.HttpInlet)
>>> 2020-07-30 19:51:46.360|I| http_to_mongo.1 insert [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}] (databay.MongoOutlet)
>>> 2020-07-30 19:51:46.361|I| http_to_mongo.1 written [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False, '_id': ObjectId('5f22c262a7aca516ec3fcf39')}] (databay.MongoOutlet)
>>> 2020-07-30 19:51:46.362|D| http_to_mongo.1 done (databay.Link)
...
Above log can be read as follows:
At first the planner adds the link provided and starts scheduling:
Added link: Link(tags:['http_to_mongo'], inlets:[HttpInlet(metadata:{})], outlets:[MongoOutlet()], interval:0:00:05) Starting ApsPlanner(threads:30)
Once scheduling starts, link will log the beginning and end of each transfer:
http_to_mongo.0 transfer
Note the
http_to_mongo.0
prefix in the message. It is the string representation of theUpdate
object that represents each individual transfer executed by that particular link.http_to_mongo
is the tag of the link, while0
represents the incremental index of the transfer.Then
HttpInlet
logs its data production:http_to_mongo.0 pulling https://jsonplaceholder.typicode.com/todos/1 http_to_mongo.0 received https://jsonplaceholder.typicode.com/todos/1
Followed by
MongoOutlet
logging its data consumption:http_to_mongo.0 insert [{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}] http_to_mongo.0 written [{'userId': 1, 'id': 1, 'title': 'delectus aut
Finally, link reports completing its first transfer:
http_to_mongo.0 done
Full example:
import datetime
import logging
from databay import Link
from databay.inlets import HttpInlet
from databay.outlets import MongoOutlet
from databay.planners import ApsPlanner
logging.getLogger('databay').setLevel(logging.DEBUG)
# Create an inlet, outlet and a link.
http_inlet = HttpInlet('https://jsonplaceholder.typicode.com/todos/1')
mongo_outlet = MongoOutlet(database_name='databay',
collection='test_collection')
link = Link(http_inlet, mongo_outlet,
datetime.timedelta(seconds=5), tags='http_to_mongo')
# Create a planner, add the link and start scheduling.
planner = ApsPlanner(link)
planner.start()
Basic Inlet¶
In this example we create a simple implementation of Inlet
, producing a random integer on a 5 second interval.
class RandomIntInlet(Inlet):
def pull(self, update):
return random.randint(0, 100)
Instantiate it:
random_int_inlet = RandomIntInlet()
Add it to a link:
link = Link(random_int_inlet,
print_outlet,
interval=timedelta(seconds=5),
tags='random_ints')
Add to a planner and schedule.
planner = SchedulePlanner(link)
planner.start()
Output:
>>> random_ints.0 50
>>> random_ints.1 61
>>> random_ints.2 5
>>> ...
On each transfer RandomIntInlet
produces a random integer.
Full example:
from databay import Link
from databay.outlets import PrintOutlet
from databay.planners import SchedulePlanner
from datetime import timedelta
from databay import Inlet
import random
class RandomIntInlet(Inlet):
def pull(self, update):
return random.randint(0, 100)
random_int_inlet = RandomIntInlet()
print_outlet = PrintOutlet(only_payload=True)
link = Link(random_int_inlet,
print_outlet,
interval=timedelta(seconds=5),
tags='random_ints')
planner = SchedulePlanner(link)
planner.start()
Basic Outlet¶
In this example we create a simple implementation of Outlet
, printing the incoming records one by one.
class PrintOutlet(Outlet):
def push(self, records: [Record], update):
for record in records:
print(update, record.payload)
Instantiate it:
print_outlet = PrintOutlet()
Add it to a link:
link = Link(random_int_inlet,
print_outlet,
interval=timedelta(seconds=2),
tags='print_outlet')
Add to a planner and schedule.
planner = SchedulePlanner(link)
planner.start()
Output:
>>> print_outlet.0 10
>>> print_outlet.1 34
>>> print_outlet.2 18
>>> ...
On each transfer PrintOutlet
prints the payload of records generated by RandomIntInlet
Full example:
from datetime import timedelta
from databay import Link
from databay.inlets import RandomIntInlet
from databay.planners import SchedulePlanner
from databay.record import Record
from databay.outlet import Outlet
class PrintOutlet(Outlet):
def push(self, records: [Record], update):
for record in records:
print(update, record.payload)
random_int_inlet = RandomIntInlet()
print_outlet = PrintOutlet()
link = Link(random_int_inlet,
print_outlet,
interval=timedelta(seconds=2),
tags='print_outlet')
planner = SchedulePlanner(link)
planner.start()
Intermediate Inlet¶
This example demonstrates an inlet that produces weather prognostic using OpenWeatherMap. It showcases what a realistic implementation of Inlet
may look like.
Create the
WeatherInlet
implementingInlet
class. We expectapi_key
andcity_name
to be provided when constructing this inlet.
from databay.inlet import Inlet
import urllib.request
class WeatherInlet(Inlet):
def __init__(self, api_key: str, city_name: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api_key = api_key
self.city_name = city_name
Implement
pull
method, starting by creating the OpenWeatherMap URL using theapi_key
andcity_name
provided.
def pull(self, update) -> List[Record]:
url = f'https://api.openweathermap.org/data/2.5/weather?' \
f'q={self.city_name}&' \
f'appid={self.api_key}'
Make a request to OpenWeatherMap using
urllib.request
.
contents = urllib.request.urlopen(url).read().decode('utf8')
Parse the response and return produced data.
formatted = json.loads(contents)
return formatted['weather'][0]['description']
Instantiate
WeatherInlet
.
api_key = os.environ.get('OPEN_WEATHER_MAP_API_KEY')
weather_inlet = WeatherInlet(api_key, 'Bangkok')
Create a link, add it to planner and schedule.
link = Link(weather_inlet, PrintOutlet(only_payload=True),
interval=timedelta(seconds=2), tags='bangkok_weather')
planner = ApsPlanner(link)
planner.start()
Output:
>>> bangkok_weather.0 light rain
>>> bangkok_weather.1 light rain
>>> bangkok_weather.2 light rain
>>> ...
On each transfer WeatherInlet
makes a request to OpenWeatherMap API and returns a description of the weather in the selected city.
Full example:
import json
import os
from datetime import timedelta
from typing import List
from databay import Record, Link
from databay.outlets import PrintOutlet
from databay.planners import ApsPlanner
from databay.inlet import Inlet
import urllib.request
class WeatherInlet(Inlet):
def __init__(self, api_key: str, city_name: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api_key = api_key
self.city_name = city_name
def pull(self, update) -> List[Record]:
url = f'https://api.openweathermap.org/data/2.5/weather?' \
f'q={self.city_name}&' \
f'appid={self.api_key}'
contents = urllib.request.urlopen(url).read().decode('utf8')
formatted = json.loads(contents)
return formatted['weather'][0]['description']
api_key = os.environ.get('OPEN_WEATHER_MAP_API_KEY')
weather_inlet = WeatherInlet(api_key, 'Bangkok')
link = Link(weather_inlet, PrintOutlet(only_payload=True),
interval=timedelta(seconds=2), tags='bangkok_weather')
planner = ApsPlanner(link)
planner.start()
Intermediate Outlet¶
This example demonstrates an outlet that writes the incoming records into a file. It showcases what a realistic implementation of Outlet
may look like.
Create the
FileOutlet
implementingOutlet
class. This outlet will accept two metadata keys:FileOutlet.FILEPATH
- specifying the file that the record should be written into.FileOutlet.FILE_MODE
- specifying the write mode using Python’s default IO.
class FileOutlet(Outlet):
FILEPATH = 'FileOutlet.FILEPATH'
"""Filepath of the file to write to."""
FILE_MODE = 'FileOutlet.FILE_MODE'
"""Write mode to use when writing into the csv file."""
We give an option to specify
default_filepath
anddefault_file_mode
when constructing this outlet.
def __init__(self,
default_filepath: str = 'outputs/default_output.txt',
default_file_mode: str = 'a'):
super().__init__()
self.default_filepath = default_filepath
self.default_file_mode = default_file_mode
Implement
push
method, looping over all records and reading their metadata.
def push(self, records: [Record], update):
for record in records:
filepath = record.metadata.get(
self.FILEPATH, self.default_filepath)
file_mode = record.metadata.get(
self.FILE_MODE, self.default_file_mode)
Write the record according to the
filepath
andfile_mode
found.
with open(filepath, file_mode) as f:
f.write(str(record.payload)+'\n')
Instantiate
FileOutlet
andRandomIntInlet
provided with a metadata dictionary.
Create a link, add to a planner and schedule.
link = Link(random_int_inlet,
file_outlet,
interval=timedelta(seconds=2),
tags='file_outlet')
planner = ApsPlanner(link)
planner.start()
Creates outputs/random_ints.txt
file:
1
76
52
76
64
89
71
12
70
74
...
Full example:
from datetime import timedelta
from databay import Link
from databay.inlets import RandomIntInlet
from databay.planners import ApsPlanner
from databay.record import Record
from databay.outlet import Outlet
class FileOutlet(Outlet):
FILEPATH = 'FileOutlet.FILEPATH'
"""Filepath of the file to write to."""
FILE_MODE = 'FileOutlet.FILE_MODE'
"""Write mode to use when writing into the csv file."""
def __init__(self,
default_filepath: str = 'outputs/default_output.txt',
default_file_mode: str = 'a'):
super().__init__()
self.default_filepath = default_filepath
self.default_file_mode = default_file_mode
def push(self, records: [Record], update):
for record in records:
filepath = record.metadata.get(
self.FILEPATH, self.default_filepath)
file_mode = record.metadata.get(
self.FILE_MODE, self.default_file_mode)
with open(filepath, file_mode) as f:
f.write(str(record.payload)+'\n')
metadata = {
FileOutlet.FILEPATH: 'outputs/random_ints.txt',
FileOutlet.FILE_MODE: 'a'
}
random_int_inlet = RandomIntInlet(metadata=metadata)
file_outlet = FileOutlet()
link = Link(random_int_inlet,
file_outlet,
interval=timedelta(seconds=2),
tags='file_outlet')
planner = ApsPlanner(link)
planner.start()
Basic metadata¶
This example demonstrates basic usage of Global metadata, used by a PrintOutlet created in the Basic Outlet example.
Create the
ConditionalPrintOutlet
implementingOutlet
class. This outlet will accept one metadata key:ConditionalPrintOutlet.SHOULD_PRINT
- whether record should be printed.
class ConditionalPrintOutlet(Outlet):
SHOULD_PRINT = 'ConditionalPrintOutlet.SHOULD_PRINT'
"""Whether records should be printed or skipped."""
Implement
push
method, looping over all records and printing them ifConditionalPrintOutlet.SHOULD_PRINT
is set:
def push(self, records: [Record], update):
for record in records:
if record.metadata.get(self.SHOULD_PRINT):
print(update, record)
Instantiate two inlets, one that always prints, other that never prints:
random_int_inlet_on = RandomIntInlet(
metadata={ConditionalPrintOutlet.SHOULD_PRINT: True})
random_int_inlet_off = RandomIntInlet(
metadata={ConditionalPrintOutlet.SHOULD_PRINT: False})
Instantiate
ConditionalPrintOutlet
and add all nodes to a link
print_outlet = ConditionalPrintOutlet()
link = Link([random_int_inlet_on, random_int_inlet_off],
print_outlet,
interval=timedelta(seconds=0.5),
tags='should_print_metadata')
Add to a planner and schedule.
planner = SchedulePlanner(link, refresh_interval=0.5)
planner.start()
Output:
>>> should_print_metadata.0 Record(payload=44, metadata={'PrintOutlet.SHOULD_PRINT': True, '__inlet__': "RandomIntInlet(metadata:{'PrintOutlet.SHOULD_PRINT': True})"})
>>> should_print_metadata.1 Record(payload=14, metadata={'PrintOutlet.SHOULD_PRINT': True, '__inlet__': "RandomIntInlet(metadata:{'PrintOutlet.SHOULD_PRINT': True})"})
>>> should_print_metadata.2 Record(payload=54, metadata={'PrintOutlet.SHOULD_PRINT': True, '__inlet__': "RandomIntInlet(metadata:{'PrintOutlet.SHOULD_PRINT': True})"})
>>> ...
On each transfer ConditionalPrintOutlet
prints records incoming only from the random_int_inlet_on
that was constructed with global metadata that allows printing.
Full example:
from datetime import timedelta
from databay import Link
from databay.inlets import RandomIntInlet
from databay.outlet import Outlet
from databay.planners import SchedulePlanner
from databay.record import Record
class ConditionalPrintOutlet(Outlet):
SHOULD_PRINT = 'ConditionalPrintOutlet.SHOULD_PRINT'
"""Whether records should be printed or skipped."""
def push(self, records: [Record], update):
for record in records:
if record.metadata.get(self.SHOULD_PRINT):
print(update, record)
random_int_inlet_on = RandomIntInlet(
metadata={ConditionalPrintOutlet.SHOULD_PRINT: True})
random_int_inlet_off = RandomIntInlet(
metadata={ConditionalPrintOutlet.SHOULD_PRINT: False})
print_outlet = ConditionalPrintOutlet()
link = Link([random_int_inlet_on, random_int_inlet_off],
print_outlet,
interval=timedelta(seconds=0.5),
tags='should_print_metadata')
planner = SchedulePlanner(link, refresh_interval=0.5)
planner.start()
Basic asynchronous¶
This tutorial showcases a simple usage of asynchronous inlets and outlets.
Create an asynchronous inlet.
class RandomIntInlet(Inlet):
async def pull(self, update):
# simulate a long-taking operation
await asyncio.sleep(0.5)
# execute
r = random.randint(0, 100)
_LOGGER.debug(f'{update} produced:{r}')
return r
Create an asynchronous outlet. Note that one asynchronous wait will be simulated for each record consumed.
class PrintOutlet(Outlet):
async def push(self, records: [Record], update):
_LOGGER.debug(f'{update} push starts')
# create an asynchronous task for each record
tasks = [self.print_task(record, update) for record in records]
# await all print tasks
await asyncio.gather(*tasks)
async def print_task(self, record, update):
# simulate a long-taking operation
await asyncio.sleep(0.5)
# execute
_LOGGER.debug(f'{update} consumed:{record.payload}')
Instantiate three asynchronous inlets and one asynchronous outlet.
random_int_inletA = RandomIntInlet()
random_int_inletB = RandomIntInlet()
random_int_inletC = RandomIntInlet()
print_outlet = PrintOutlet()
link = Link([random_int_inletA, random_int_inletB, random_int_inletC],
print_outlet,
interval=timedelta(seconds=2),
tags='async')
Add to a planner and schedule.
planner = SchedulePlanner(link)
planner.start()
Output:
>>> 2020-08-04 22:40:41.242|D| async.0 transfer
>>> 2020-08-04 22:40:41.754|D| async.0 produced:20
>>> 2020-08-04 22:40:41.754|D| async.0 produced:55
>>> 2020-08-04 22:40:41.754|D| async.0 produced:22
>>> 2020-08-04 22:40:41.755|D| async.0 push starts
>>> 2020-08-04 22:40:42.267|D| async.0 consumed:20
>>> 2020-08-04 22:40:42.267|D| async.0 consumed:55
>>> 2020-08-04 22:40:42.267|D| async.0 consumed:22
>>> 2020-08-04 22:40:42.267|D| async.0 done
>>> 2020-08-04 22:40:43.263|D| async.1 transfer
>>> 2020-08-04 22:40:43.776|D| async.1 produced:10
>>> 2020-08-04 22:40:43.776|D| async.1 produced:4
>>> 2020-08-04 22:40:43.776|D| async.1 produced:90
>>> 2020-08-04 22:40:43.777|D| async.1 push starts
>>> 2020-08-04 22:40:44.292|D| async.1 consumed:10
>>> 2020-08-04 22:40:44.292|D| async.1 consumed:4
>>> 2020-08-04 22:40:44.292|D| async.1 consumed:90
>>> 2020-08-04 22:40:44.292|D| async.1 done
On each transfer, two asynchronous operations take place:
First, all inlets are simultaneously awaiting before producing their data.
Once all data from inlets is gathered, the second stage commences where the outlet simultaneously awaits for each record before printing it out.
This simulates a delay happening either in the inlets or outlets. Note how one transfer takes approximately a second to complete, despite executing six operations each requiring 0.5 seconds of sleep. If this was to execute synchronously, the entire transfer would take around 3 seconds to complete.
Full example:
import asyncio
import logging
from databay import Link, Outlet, Record
from databay.planners import SchedulePlanner
from datetime import timedelta
from databay import Inlet
import random
_LOGGER = logging.getLogger('databay.basic_asynchronous')
logging.getLogger('databay').setLevel(logging.DEBUG)
class RandomIntInlet(Inlet):
async def pull(self, update):
# simulate a long-taking operation
await asyncio.sleep(0.5)
# execute
r = random.randint(0, 100)
_LOGGER.debug(f'{update} produced:{r}')
return r
class PrintOutlet(Outlet):
async def push(self, records: [Record], update):
_LOGGER.debug(f'{update} push starts')
# create an asynchronous task for each record
tasks = [self.print_task(record, update) for record in records]
# await all print tasks
await asyncio.gather(*tasks)
async def print_task(self, record, update):
# simulate a long-taking operation
await asyncio.sleep(0.5)
# execute
_LOGGER.debug(f'{update} consumed:{record.payload}')
random_int_inletA = RandomIntInlet()
random_int_inletB = RandomIntInlet()
random_int_inletC = RandomIntInlet()
print_outlet = PrintOutlet()
link = Link([random_int_inletA, random_int_inletB, random_int_inletC],
print_outlet,
interval=timedelta(seconds=2),
tags='async')
planner = SchedulePlanner(link)
planner.start()
Elasticsearch Outlet¶
In this example we create an implementation of Outlet
that indexes records as documents to a running Elasticsearch instance.
Note: this example assumes that Elasticsearch is correctly configured and that the index you are indexing documents to exists with the appropriate mappings. For more details see the official Elasticsearch Python client
Extend the
Outlet
with new parameters required when constructing:es_client
- an instance of the elasticsearch python client andindex_name
the name of a pre-existing index in the running cluster.
class ElasticsearchIndexerOutlet(Outlet):
" An example outlet for indexing text documents from any `Inlet`."
def __init__(self,
es_client: elasticsearch.Elasticsearch,
index_name: str,
overwrite_documents: bool = True):
super().__init__()
self.es_client = es_client
self.index_name = index_name
# if true existing documents will be overwritten
# otherwise we will skip indexing and log that document id exists in index.
self.overwrite_documents = overwrite_documents
if not self.es_client.indices.exists(self.index_name):
raise RuntimeError(f"Index '{self.index_name}' does not exist ")
In this implementation of the
push
method there are a few custom behaviors specified. As we iterate over every incoming record:We use the dict keys from the current record’s
payload
as our unique document ID.The flag
self.overwrite_documents
determines whether we will check if an id already exists.If
self.overwrite_documents
is True we simply index the document and_id
without doing any check.Otherwise we use the client to check if
_id
exists in the index. If it does we skip and log that it already exists. Otherwise it is indexed as normal.
def push(self, records: List[Record], update):
for record in records:
payload = record.payload
# using dict keys from payload as unique id for index
for k in payload.keys():
_id = k
text = payload[k]
body = {"my_document": text}
if self.overwrite_documents:
self.es_client.index(
self.index_name, body, id=_id)
_LOGGER.info(f"Indexed document with id {_id}")
else:
if self.es_client.exists(self.index_name, _id):
# log that already exists
_LOGGER.info(
f"Document already exists for id {_id}. Skipping.")
else:
self.es_client.index(
self.index_name, body, id=_id)
_LOGGER.info(f"Indexed document with id {_id}")
This simple
Inlet
takes a list of strings as its main parameter. In itspull
method it randomly selects one and returns the string and an incrementing id as adict
. We’ll use this to pass documents to our Elasticsearch Outlet.
class DummyTextInlet(Inlet):
"A simple `Inlet` that randomly pulls a string from a list of strings."
def __init__(self, text: list, *args, **kwargs):
super().__init__(*args, **kwargs)
self.text = text
self._id = 0
def pull(self, update):
text_selection = random.choice(self.text)
self._id += 1
time.sleep(1)
return {self._id: text_selection}
Instantiate our simple
Inlet
as well as an instance ofElasticsearchIndexerOutlet
with the default parameter foroverwrite_documents
.We use the official Elasticsearch Python client for es_client.
This example assumes
my-test-index
exists already in our elasticsearch cluster.
es_client = elasticsearch.Elasticsearch(timeout=30)
text_inlet = DummyTextInlet(TEXT.split("."))
elasticsearch_outlet = ElasticsearchIndexerOutlet(
es_client, "my-test-index")
Tie it all together using
Link
ANDPlanner
The link is setup to index a new document every 2 seconds.
link = Link(text_inlet,
elasticsearch_outlet,
interval=2,
tags='elasticsearch_outlet')
planner = ApsPlanner(link)
planner.start()
Output:
From the logs we can see that the records are being written into our Elasticsearch index.
>>> Indexed document with id 1
>>> Indexed document with id 2
>>> Indexed document with id 3
>>> Indexed document with id 4
>>> Indexed document with id 5
>>> Indexed document with id 6
>>> Indexed document with id 7
>>> Indexed document with id 8
Output (if overwrite_documents
is set to False
):
From the logs we can see that the record ID’s so far have already been written into our Elasticsearch index.
>>> Document already exists for id 1. Skipping.
>>> Document already exists for id 2. Skipping.
>>> Document already exists for id 3. Skipping.
>>> Document already exists for id 4. Skipping.
>>> Document already exists for id 5. Skipping.
>>> Document already exists for id 6. Skipping.
>>> Document already exists for id 7. Skipping.
>>> Document already exists for id 8. Skipping.
Full example:
import logging
import random
import time
from typing import List
import elasticsearch
from databay import Inlet, Link, Outlet
from databay.planners import ApsPlanner
from databay.record import Record
TEXT = """
Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Phasellus ex erat, viverra tincidunt tempus eget, hendrerit sed ligula.
Quisque mollis nibh in imperdiet porttitor. Nulla bibendum lacus et est lobortis porta.
Nulla sed ligula at odio volutpat consectetur. Sed quis augue ac magna porta imperdiet interdum eu velit.
Integer pretium ultrices urna, id viverra mauris ultrices ut. Etiam aliquet tellus porta nisl eleifend, non hendrerit nisl sodales.
Aliquam eget porttitor enim.
"""
_LOGGER = logging.getLogger('databay.elasticsearch_outlet')
class ElasticsearchIndexerOutlet(Outlet):
" An example outlet for indexing text documents from any `Inlet`."
def __init__(self,
es_client: elasticsearch.Elasticsearch,
index_name: str,
overwrite_documents: bool = True):
super().__init__()
self.es_client = es_client
self.index_name = index_name
# if true existing documents will be overwritten
# otherwise we will skip indexing and log that document id exists in index.
self.overwrite_documents = overwrite_documents
if not self.es_client.indices.exists(self.index_name):
raise RuntimeError(f"Index '{self.index_name}' does not exist ")
def push(self, records: List[Record], update):
for record in records:
payload = record.payload
# using dict keys from payload as unique id for index
for k in payload.keys():
_id = k
text = payload[k]
body = {"my_document": text}
if self.overwrite_documents:
self.es_client.index(
self.index_name, body, id=_id)
_LOGGER.info(f"Indexed document with id {_id}")
else:
if self.es_client.exists(self.index_name, _id):
# log that already exists
_LOGGER.info(
f"Document already exists for id {_id}. Skipping.")
else:
self.es_client.index(
self.index_name, body, id=_id)
_LOGGER.info(f"Indexed document with id {_id}")
class DummyTextInlet(Inlet):
"A simple `Inlet` that randomly pulls a string from a list of strings."
def __init__(self, text: list, *args, **kwargs):
super().__init__(*args, **kwargs)
self.text = text
self._id = 0
def pull(self, update):
text_selection = random.choice(self.text)
self._id += 1
time.sleep(1)
return {self._id: text_selection}
_LOGGER.setLevel(logging.INFO)
es_client = elasticsearch.Elasticsearch(timeout=30)
text_inlet = DummyTextInlet(TEXT.split("."))
elasticsearch_outlet = ElasticsearchIndexerOutlet(
es_client, "my-test-index")
link = Link(text_inlet,
elasticsearch_outlet,
interval=2,
tags='elasticsearch_outlet')
planner = ApsPlanner(link)
planner.start()
Twitter Inlet¶
In this example we create an implementation of an Inlet
that connects to the Twitter API and either listens for new tweets for a specific user or to the home timeline for an authenticated account.
Note: this example assumes that the Tweepy client is correctly configured and that the Twitter account is registered to use the API. For more details on Tweepy click here.
Extend the
Inlet
by passing in an instance of the Tweepy clientapi
. Depending on the use case users can also pass inuser
if they want to run the Inlet on a specific username.
class TwitterInlet(Inlet):
"""
An implementation of an `Inlet` that uses the Tweepy (https://www.tweepy.org/)
Twitter client to pull tweets from either a specific users' timeline or the
home timeline belonging to an authenticated `tweepy.API` instance.
"""
def __init__(self, api: tweepy.API, user: str = None, most_recent_id=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api = api
self.user = user
# this will ensure we only every pull tweets that haven't been handled
self.most_recent_id = most_recent_id
# sets flag indicating whether we are pulling from as single user
# or from the home timeline.
if self.user is None:
self.is_user_timeline = False
else:
self.is_user_timeline = True
For the
pull
method we perform a number of configuration specific checks:If the flag
self.is_user_timeline
isTrue
we’ll be using theuser_timeline
method of the Tweepy API. This pulls tweets from a specific users’ timeline rather than the registered accounts’ home timeline.Additionally there is a check in both conditional branches that checks for
self.most_recent_id
, if a recent ID exists then this ID is passed an additional parameter to Tweepy. This will ensure that only new tweets since the last pull are fetched.self.most_recent_id
is assigned by taking the ID from the first tweet in the results list.
def pull(self, update):
if self.is_user_timeline:
if self.most_recent_id is not None:
public_tweets = self.api.user_timeline(
self.user, since_id=self.most_recent_id)
else:
public_tweets = self.api.user_timeline(
self.user)
else:
if self.most_recent_id is not None:
public_tweets = self.api.home_timeline(
since_id=self.most_recent_id)
else:
public_tweets = self.api.home_timeline()
if len(public_tweets) > 0:
# 0th tweet is most recent
self.most_recent_id = public_tweets[0].id
tweets = []
for tweet in public_tweets:
tweets.append({"user": tweet.user.screen_name, "text": tweet.text})
return tweets
To authenticate Tweepy correctly the appropriate keys and secrets must be passed to the API.
auth = tweepy.OAuthHandler(
consumer_key, consumer_secret) # user defined values
auth.set_access_token(access_token, access_token_secret) # user defined values
# extra params here protect against twitter rate limiting
# set link intervals with this in mind
# for more on twitter rate limiting see https://developer.twitter.com/en/docs/rate-limits
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
The
TwitterInlet
can then be instantiated as seen below. We are using thePrintOutlet
to print the results of each successful pull.Note: Be mindful of the
interval
you pass toLink
as the Twitter API has strict rate limiting policies.
# create TwitterUserInlet() pointed at a specific account name
twitter_user_inlet = TwitterInlet(api, "@BarackObama")
link = Link(twitter_user_inlet, PrintOutlet(only_payload=True),
interval=30, tags='twitter_timeline')
planner = SchedulePlanner(link)
planner.start()
Output:
>>> {'user': 'BarackObama', 'text': 'Georgia’s runoff election will determine whether the American people have a Senate that’s actually fighting for the… https://t.co/igUiRzxNxe'}
>>> {'user': 'BarackObama', 'text': 'Here’s a great way to call voters in Georgia and help them get ready to vote. A couple hours this weekend could hel… https://t.co/x6Nc8w7F38'}
>>> {'user': 'BarackObama', 'text': "Happy Hanukkah to all those celebrating around the world. This year has tested us all, but it's also clarified what… https://t.co/k2lzUQ9LNm"}
>>> {'user': 'BarackObama', 'text': 'In A Promised Land, I talk about the decisions I had to make during the first few years of my presidency. Here are… https://t.co/KbE2FDStYr'}
>>> {'user': 'BarackObama', 'text': "With COVID-19 cases reaching an all-time high this week, we've got to continue to do our part to protect one anothe… https://t.co/Gj0mEFfuLY"}
>>> {'user': 'BarackObama', 'text': 'To all of you in Georgia, today is the last day to register to vote in the upcoming runoff election. Take a few min… https://t.co/Jif3Gd7NpQ'}
Full example:
import os
import tweepy
from databay import Inlet, Link
from databay.outlets import PrintOutlet
from databay.planners import SchedulePlanner
class TwitterInlet(Inlet):
"""
An implementation of an `Inlet` that uses the Tweepy (https://www.tweepy.org/)
Twitter client to pull tweets from either a specific users' timeline or the
home timeline belonging to an authenticated `tweepy.API` instance.
"""
def __init__(self, api: tweepy.API, user: str = None, most_recent_id=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api = api
self.user = user
# this will ensure we only every pull tweets that haven't been handled
self.most_recent_id = most_recent_id
# sets flag indicating whether we are pulling from as single user
# or from the home timeline.
if self.user is None:
self.is_user_timeline = False
else:
self.is_user_timeline = True
def pull(self, update):
if self.is_user_timeline:
if self.most_recent_id is not None:
public_tweets = self.api.user_timeline(
self.user, since_id=self.most_recent_id)
else:
public_tweets = self.api.user_timeline(
self.user)
else:
if self.most_recent_id is not None:
public_tweets = self.api.home_timeline(
since_id=self.most_recent_id)
else:
public_tweets = self.api.home_timeline()
if len(public_tweets) > 0:
# 0th tweet is most recent
self.most_recent_id = public_tweets[0].id
tweets = []
for tweet in public_tweets:
tweets.append({"user": tweet.user.screen_name, "text": tweet.text})
return tweets
# gets twitter api secrets and keys from environment vars
consumer_key = os.getenv("twitter_key")
consumer_secret = os.getenv("twitter_secret")
access_token = os.getenv("twitter_access_token")
access_token_secret = os.getenv("twitter_access_token_secret")
auth = tweepy.OAuthHandler(
consumer_key, consumer_secret) # user defined values
auth.set_access_token(access_token, access_token_secret) # user defined values
# extra params here protect against twitter rate limiting
# set link intervals with this in mind
# for more on twitter rate limiting see https://developer.twitter.com/en/docs/rate-limits
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
# create TwitterUserInlet() pointed at a specific account name
twitter_user_inlet = TwitterInlet(api, "@BarackObama")
link = Link(twitter_user_inlet, PrintOutlet(only_payload=True),
interval=30, tags='twitter_timeline')
planner = SchedulePlanner(link)
planner.start()