Source code for databay.outlets.print_outlet

from databay.outlet import Outlet
from databay.record import Record


[docs]class PrintOutlet(Outlet): """ Outlet that will print all records one by one. """ def __init__(self, only_payload: bool = False, skip_update: bool = False, *args, **kwargs): """ :param only_payload: If True, prints only the payload of records. :type only_payload: bool :param skip_update: If True, Update prefix will not be added to the print. :type skip_update: bool """ super().__init__(*args, **kwargs) self.only_payload = only_payload self.skip_update = skip_update
[docs] async def push(self, records: [Record], update): """ Prints the records. :type records: list[:any:`Record`] :param records: List of records generated by inlets. Each top-level element of this array corresponds to one inlet that successfully returned data. Note that inlets could return arrays too, making this a nested array. :type update: :any:`Update` :param update: Update object representing the particular Link update run. """ update = str(update)+' ' if not self.skip_update else '' for record in records: body = str(record.payload) if self.only_payload else str(record) print(f'{update}{body}')