Databay is a Python interface for scheduled data transfer.
It facilitates transfer of (any) data from A to B, on a scheduled interval.
In Databay, data transfer is expressed with three components:
Inlets- for data production.
Outlets- for data consumption.
Links- for handling the data transit between inlets and outlets.
Scheduling is implemented using third party libraries, exposed through the
BasePlanner interface. Currently two BasePlanner implementations are available - using Advanced Python Scheduler (
ApsPlanner) and Schedule (
# Create an inlet, outlet and a link. http_inlet = HttpInlet('https://some.test.url.com/') mongo_outlet = MongoOutlet('databay', 'test_collection') link = Link(http_inlet, mongo_outlet, datetime.timedelta(seconds=5)) # Create a planner, add the link and start scheduling. planner = ApsPlanner(link) planner.start()
Every 5 seconds this snippet will pull data from a test URL, and write it to MongoDB.
While Databay comes with a handful of built-in inlets and outlets, its strength lies in extendability. To use Databay in your project, create concrete implementations of
Outlet classes that handle the data production and consumption functionality you require. Databay will then make sure data can repeatedly flow between the inlets and outlets you create. Extending Inlets and extending Outlets is easy and has a wide range of customization. Head over to Extending Databay section for a detailed explanation.
Records are data objects that provide a unified interface for data handling across Databay. In addition to storing data produced by inlets, records may also carry individual metadata. This way information can be passed between inlets and outlets, facilitating a broad spectrum of custom implementations. For instance one CsvOutlet could be used for writing into two different csv files depending on which inlet the data came from.
The principal functionality of Databay is to execute data transfer repeatedly on a pre-defined interval. To facilitate this, links are governed by a scheduler object implementing the
BasePlanner class. Using the concrete scheduling functionality, links’ transfers are executed in respect with their individual interval setting.
To schedule a link, all you need to do is to add it to a planner and call
start to begin scheduling.
link = Link(some_inlet, some_outlet, timedelta(minutes=1)) planner = SchedulePlanner(link) planner.start()
Databay provides two built-in
BasePlanner implementations based on two popular Python scheduling libraries:
While they differ in the method of scheduling, threading and exception handling, they both cover a reasonable variety of scheduling scenarios. Please refer to their appropriate documentation for more details on the difference between the two.
To begin scheduling links you need to call
start on the planner you’re using. Both
start as a synchronous blocking function. To run
start without blocking the current thread, wrap its call within a new thread or a process:
th = Thread(target=planner.start) th.start()
To stop scheduling links you need to call
shutdown(wait:bool=True) on the planner you’re using. Note that this may or may not let the currently transferring links finish, depending on the implementation of the
BasePlanner that you’re using. Both
SchedulePlanner allow waiting for the links if
shutdown is called passing
True as the
on_start and on_shutdown
Just before scheduling starts,
Outlet.on_start callbacks will be propagated through all inlets and outlets. Consequently, just after scheduling shuts down,
Outlet.on_shutdown callbacks will be propagated through all inlets and outlets. In both cases, these callbacks will be called only once for each inlet and outlet. Override these callback methods to implement custom starting and shutdown behaviour in your inlets and outlets.
By default BasePlanner will execute
Link.transfer function on all its links once upon calling
BasePlanner.start. This is to avoid having to wait for the link’s interval to expire before the first transfer. You can disable this behaviour by passing
immediate_transfer=False parameter on construction.
If exceptions are thrown during transfer, both planners can be set to log and ignore these by passing the
ignore_exceptions=True parameter on construction. This ensures transfer of remaining links can carry on even if some links are erroneous. If exceptions aren’t ignored, both
SchedulePlanner will log the exception and gracefully shutdown.
Link can be configured to catch exceptions by passing
ignore_exceptions=True on construction. This way any exceptions raised by individual inlets and outlets can be logged and ignored, allowing the remaining nodes to continue execution and for the transfer to complete.
# for planners planner = SchedulePlanner(ignore_exceptions=True) planner = ApsPlanner(ignore_exceptions=True) # for links link = Link(..., ignore_exceptions=True)
%Y-%m-%d %H:%M:%S.milis|levelname| message (logger name)
2020-07-30 19:51:41.318|D| http_to_mongo.0 transfer (databay.Link)
By default Databay will only log messages with
WARNING priority or higher. You can manually enable more verbose logging by calling:
logging.getLogger('databay').setLevel(logging.DEBUG) # Or do it only for a particular child logger: logging.getLogger('databay.ApsPlanner').setLevel(logging.DEBUG)
You can attach new handlers to any of these loggers in order to implement custom logging behaviour - such as a
FileHandler to log into a file, or a separate
StreamHandler to customise the print signature.
The data flow in Databay is different from a more widely adopted Observer Pattern, where data production and propagation are represented by one object, and consumption by another. In Databay data production and propagation is split between the
Link objects. This results in a data flow model in which each stage - data transfer, production and consumption - is independent from the others.
Inlets are only concerned with producing data,
Outlets only with consuming data and
Links only with transferring data. Such a model is motivated by separation of concerns and by facilitating custom implementation of data producers and consumers.