Source code for databay.outlets.csv_outlet

import csv
import logging
import os

from databay.outlet import Outlet, MetadataKey
from databay.record import Record

from pathlib import Path

_LOGGER = logging.getLogger('databay.CsvOutlet')


[docs]class CsvOutlet(Outlet): """ Outlet that writes records to a csv file. """
[docs] CSV_FILE: MetadataKey = 'CsvOutlet.CSV_FILE'
"""Filepath of the csv file to write records to."""
[docs] FILE_MODE: MetadataKey = 'CsvOutlet.FILE_MODE'
"""Write mode to use when writing into the csv file.""" def __init__(self, default_filepath: str, default_file_mode: str = 'a', *args, **kwargs): """ :param default_filepath: Filepath of the default csv file to write records to. :type default_filepath: str :param default_file_mode: Default write mode to use when writing into the csv file. :type default_file_mode: str """ super().__init__(*args, **kwargs) self.default_filepath = default_filepath self.default_file_mode = default_file_mode
[docs] def push(self, records: [Record], update): """ Writes records to a csv file. :type records: list[:any:`Record`] :param records: List of records generated by inlets. Each top-level element of this array corresponds to one inlet that successfully returned data. Note that inlets could return arrays too, making this a nested array. :type update: :any:`Update` :param update: Update object representing the particular Link transfer. """ for record in records: filepath = record.metadata.get( self.CSV_FILE, self.default_filepath) file_mode = record.metadata.get( self.FILE_MODE, self.default_file_mode) _LOGGER.info( f'{update} writing into: {filepath}, file mode: {file_mode}, record: {record}') # todo: add more write options Path(filepath).parent.mkdir(parents=True, exist_ok=True) with open(filepath, file_mode, newline="") as f: # todo: add more DictWriter options writer = csv.DictWriter(f, record.payload.keys()) if not os.path.exists(filepath) or os.path.getsize(filepath) == 0 or 'w' in file_mode: writer.writeheader() writer.writerow(record.payload)