import csv
import logging
import os
from databay.outlet import Outlet, MetadataKey
from databay.record import Record
from pathlib import Path
_LOGGER = logging.getLogger('databay.CsvOutlet')
[docs]class CsvOutlet(Outlet):
"""
Outlet that writes records to a csv file.
"""
[docs] CSV_FILE: MetadataKey = 'CsvOutlet.CSV_FILE'
"""Filepath of the csv file to write records to."""
[docs] FILE_MODE: MetadataKey = 'CsvOutlet.FILE_MODE'
"""Write mode to use when writing into the csv file."""
def __init__(self, default_filepath: str, default_file_mode: str = 'a', *args, **kwargs):
"""
:param default_filepath: Filepath of the default csv file to write records to.
:type default_filepath: str
:param default_file_mode: Default write mode to use when writing into the csv file.
:type default_file_mode: str
"""
super().__init__(*args, **kwargs)
self.default_filepath = default_filepath
self.default_file_mode = default_file_mode
[docs] def push(self, records: [Record], update):
"""
Writes records to a csv file.
:type records: list[:any:`Record`]
:param records: List of records generated by inlets. Each top-level element of this array corresponds to one inlet that successfully returned data. Note that inlets could return arrays too, making this a nested array.
:type update: :any:`Update`
:param update: Update object representing the particular Link transfer.
"""
for record in records:
filepath = record.metadata.get(
self.CSV_FILE, self.default_filepath)
file_mode = record.metadata.get(
self.FILE_MODE, self.default_file_mode)
_LOGGER.info(
f'{update} writing into: {filepath}, file mode: {file_mode}, record: {record}')
# todo: add more write options
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
with open(filepath, file_mode, newline="") as f:
# todo: add more DictWriter options
writer = csv.DictWriter(f, record.payload.keys())
if not os.path.exists(filepath) or os.path.getsize(filepath) == 0 or 'w' in file_mode:
writer.writeheader()
writer.writerow(record.payload)