Source code for databay.support.buffers

import logging
import time
from typing import List, Union

from databay import Record

_LOGGER = logging.getLogger('databay.Buffer')

[docs]class Buffer(): """ Buffers are special built-in :any:`processors`. They allow you to temporarily accumulate records before passing them over to outlets. When processing records (see :any:`processors`) a Buffer will figure out whether records should be stored or released. This is done by passing the list of records to Buffer's internal :any:`callable` functions called controllers. Each controller performs different types of checks, returning :code:`True` or :code:`False` depending on whether records should be released or stored respectively. """ def __init__(self, count_threshold : int = None, time_threshold: float = None, custom_controllers: Union[callable, List[callable]] = None, on_reset: callable = None, conjugate_controllers: bool = False ): """ :type count_threshold: int :param count_threshold: The number of records stored that when reached will complete the count controller. When set to :code:`None` it will disable the count controller. |default| :code:`None` :type time_threshold: float :param time_threshold: The number of seconds elapsed since the previous release that when reached will complete the time controller. When set to :code:`None` it will disable the time controller. |default| :code:`None` :type custom_controllers: :any:`callable` or list[:any:`callable`] :param custom_controllers: List of custom :any:`callable` controllers. |default| :code:`None` :type on_reset: :any:`callable` :param on_reset: Callback invoked when :any:`reset <Buffer.reset>` is called. |default| :code:`None` :type conjugate_controllers: bool :param conjugate_controllers: Whether to release the records when any controller returns :code:`True` or to wait for all of them to complete before releasing records. |default| :code:`False` """ self.count_threshold = count_threshold self.time_threshold = time_threshold self.custom_controllers = custom_controllers self.conjugate_controllers = conjugate_controllers if self.custom_controllers is None: self.custom_controllers = [] elif not isinstance(self.custom_controllers, list): self.custom_controllers : List[callable] = [self.custom_controllers] self.controllers = [] self.records = [] self.time_start = None self.flush = False self.on_reset = on_reset def _count_controller(self, records:List[Record]) -> bool: if len(records) > self.count_threshold: return True else: return False def _time_controller(self, records:List[Record]) -> bool: if self.time_start is None: self.time_start = time.time() if time.time() > (self.time_start + self.time_threshold): return True else: return False
[docs] def get_controllers(self): """ Return the list of currently active controllers. :returns: list of controllers :rtype: list[:any:`callable`] """ controllers = [] controllers = controllers + self.custom_controllers if self.count_threshold is not None: controllers.append(self._count_controller) if self.time_threshold is not None: controllers.append(self._time_controller) return controllers
def _execute(self, records:List[Record]) -> List[Record]: self.records += records rv = [] if self.flush: rv = self.records else: if self.conjugate_controllers: controllers_passing = True for controller in self.get_controllers(): try: controllers_passing &= controller(self.records) except Exception as e: _LOGGER.exception(f'Exception while running controller {controller} with records {records}. Content: {str(e)}') continue if not controllers_passing: break # one controller returned False, skip the rest if controllers_passing: rv = self.records else: for controller in self.get_controllers(): try: decision = controller(self.records) except Exception as e: _LOGGER.exception(f'Exception while running controller {controller} with records {records}. Content: {str(e)}') continue if decision: rv = self.records break # one controller returned True, skip the rest if rv != []: self.reset() return rv def __call__(self, records:List[Record]): return self._execute(records)
[docs] def reset(self): """ Resets this buffer, resetting the controllers' counters and emptying the list of records stored. """ self.records = [] self.time_start = None self.flush = False if self.on_reset is not None: self.on_reset()