Source code for databay.inlets.http_inlet

"""
.. warning::
    :any:`HttpInlet` requires `AIOHTTP <https://docs.aiohttp.org/en/stable/>`_ to function. Please install required dependencies using:

    .. code-block:: python

        pip install "databay[HttpInlet]"
"""

import json
import logging
from json import JSONDecodeError
from typing import List, Union, Optional

import aiohttp
import ssl

from aiohttp.typedefs import LooseHeaders

from databay.inlet import Inlet
from databay import Record

_LOGGER = logging.getLogger('databay.HttpInlet')


[docs]class HttpInlet(Inlet): """ Inlet for pulling data from a specified URL using `aiohttp <aiohttp.ClientSession.get_>`__. .. _aiohttp.ClientSession.get: https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientSession.get """ def __init__(self, url: str, json: str = True, cacert : Optional[str] = None, params : Optional[dict] = None, headers : Optional[LooseHeaders] = None, *args, **kwargs): """ :type url: str :param url: URL that should be queried for data. :type json: bool :param json: Whether response should be parsed as JSON. |default| :code:`True` :type cacert: str :param cacert: Path to cacert TLS certificate bundle. |default| :code:`None` :type params: dict :param params: Parameters for the request. |default| :code:`None` :type headers: LooseHeaders :param headers: Headers for the request. |default| :code:`None` """ self.tcp_connector = None self.context = None super().__init__(*args, **kwargs) self.url = url self.json = json self.cacert = cacert self.params = params self.headers = headers if self.cacert is not None and self.cacert != False: context = ssl.create_default_context() context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = True context.load_verify_locations(self.cacert) self.context = context
[docs] async def pull(self, update) -> Union[List[Record], str]: """ Asynchronously pulls data from the specified URL using aiohttp.ClientSession.get_ :type update: :any:`Update` :param update: Update object representing the particular Link transfer. :return: Single or multiple records produced. :rtype: :any:`Record` or list[:any:`Record`] """ if self.context is not None: self.tcp_connector = aiohttp.TCPConnector(ssl=self.context) _LOGGER.info(f'{update} pulling {self.url} params={self.params}') async with aiohttp.ClientSession(connector=self.tcp_connector, headers=self.headers) as session: async with session.get(self.url, params=self.params) as response: payload = await response.read() _LOGGER.info(f'{update} received {self.url} params={self.params}') if payload == b'': _LOGGER.info(f'{update} no results {self.url} params={self.params}') return [] try: if self.json: return json.loads(payload) else: return payload.decode("utf-8") except Exception as e: if isinstance(e, JSONDecodeError) and 'Expecting value: line 1 column 1 (char 0)' in str(e): raise ValueError( f'Response does not contain valid JSON:\n\n{payload}') else: raise e
def __repr__(self): s = "%s(" % (self.__class__.__name__) s += f'url={self.url}' if self.params: s += f'params={self.params}' if self.cacert: s += f'cacert={self.cacert}' if self.metadata: s += ', metadata:%s' % self.metadata s += ')' return s