Source code for prozorro_sale.tools.api_requests

import functools
from typing import Union, TYPE_CHECKING

import ujson
from aiohttp import web, hdrs
from aiohttp.client import ClientSession, _RequestContextManager, TCPConnector

from prozorro_sale.tools.context_variables import REQUEST_ID
from prozorro_sale.tools.logger import get_custom_logger

if TYPE_CHECKING:
    # type hinting without cyclic imports
    from prozorro_sale.tools.application_worker import ApplicationWorker

__all__ = ['async_api_request', 'setup_async_api_requests']

LOG = get_custom_logger(__name__)


[docs]class DefaultResponse(BaseException): """Wrapper for Response """ __default_response__ = True
[docs] def __init__(self, resp): self.resp = resp
[docs]class Requests: """Thin wrapper for aiohttp.ClientSession with Requests simplicity. """
[docs] def __init__(self, *args, **kwargs): self.__cleaning_installed = False self.__session_args = (args, kwargs) self.__session = None
@property def session(self): """An instance of aiohttp.ClientSession. """ return self._session()
[docs] def _session(self): """Initialize session config for aiohttp.ClientSession instance. """ if 'json_serialize' not in self.__session_args[1]: self.__session_args[1]['json_serialize'] = ujson.dumps if 'connector' not in self.__session_args[1] or self.__session_args[1]['connector'].closed: force_close = not self.__cleaning_installed enable_cleanup_closed = not self.__cleaning_installed limit = 100 if self.__cleaning_installed else 1 self.__session_args[1]['connector'] = TCPConnector( force_close=force_close, enable_cleanup_closed=enable_cleanup_closed, limit=limit ) if not self.__cleaning_installed: LOG.warning('The automatic hook for closing the connection is not installed. ' 'Please close the session manually') if self.closed or self.connector_closed: self.__session = ClientSession(*self.__session_args[0], **self.__session_args[1]) return self.__session
[docs] @staticmethod def _check_code(status_code, codes): """ Validate response status_code. If not valid returns True. Returns: bool: if response status code is not valid returns True """ if isinstance(codes, int): codes = (codes, codes) if isinstance(codes, tuple) and len(codes) == 2: if any(code is ... for code in codes): return codes[1] >= status_code if codes[0] is ... else codes[0] <= status_code codes = sorted(codes) return codes[0] <= status_code <= codes[1] return False
[docs] def _check_resp_status(self, status_code, error_codes, default_resp): """ Check response status code. May raise exception with default message or sent error_message. Args: status_code (int): current response status code error_codes (dict): data if error response default_resp (dict): default data if error response Raises: DefaultResponse CustomError """ if default_resp: for codes, resp in default_resp.items(): if self._check_code(status_code, codes): raise DefaultResponse(resp) if error_codes: for ex, codes in error_codes.items(): if isinstance(ex, type) and issubclass(ex, BaseException): ex = (ex, '') cases = [ self._check_code(status_code, codes), isinstance(ex, tuple), len(ex) == 2, ] if all(cases): raise ex[0](ex[1].format(status_code))
[docs] def __getattr__(self, attr): if attr.upper() in hdrs.METH_ALL: @functools.wraps(self.session._request) async def session_request(*args, **kwargs): """ Constructs and sends a request. Returns response object. Args: url - request url params - (optional) Dictionary or bytes to be sent in the query string of the new request data - (optional) Dictionary, bytes, or file-like object to send in the body of the request json - (optional) Any json compatible python object headers - (optional) Dictionary of HTTP Headers to send with the request cookies - (optional) Dict object to send with the request auth - (optional) BasicAuth named tuple represent HTTP Basic Auth auth - aiohttp.helpers.BasicAuth allow_redirects - (optional) If set to False, do not follow redirects version - Request HTTP version. compress - Set to True if request has to be compressed with deflate encoding. chunked - Set to chunk size for chunked transfer encoding. expect100 - Expect 100-continue response from server. connector - BaseConnector sub-class instance to support connection pooling. read_until_eof - Read response until eof if response does not have Content-Length header. loop - Optional event loop. timeout - Optional ClientTimeout settings structure, 5min total timeout by default. error_codes - Optional, { SomeException: 404, SomeAnotherException: (403, 400) }. ... is used to indicate a code range from the start or to the end. default_response = Optional, {404: {}, (403, 400): {}}. Example:: >>> from prozorro_sale.tools.api_requests import async_api_request ... data = await async_api_request.get('https://procedure.prozorro.sale/api') """ error_codes = kwargs.pop('error_codes', {}) default_resp = kwargs.pop('default_response', {}) headers = kwargs.pop('headers', {}) headers['X-Request-ID'] = REQUEST_ID.get('unknown') data = {} async with await _RequestContextManager(self.session._request( attr.upper(), *args, headers=headers, **kwargs )) as resp: try: self._check_resp_status(resp.status, error_codes, default_resp) except DefaultResponse as ex: if ex.__default_response__: data = ex.resp else: data = await resp.json() return data return session_request else: return super().__getattribute__(attr)
@property def closed(self): """ Check if aiohttp.ClientSession is closed. Returns: bool """ if self.__session: cases = [ self.__session.closed, self.__session._loop.is_closed() ] return any(cases) return True @property def connector_closed(self): """ Check if aiohttp.ClientSession connector is closed Returns: bool """ if self.__session: cases = [ self.__session._connector is None, self.__session._connector and self.__session._connector.closed ] return any(cases) return True
[docs] async def init(self): """ Initialize aiohttp.ClientSession. Returns: None """ self.__cleaning_installed = True self._session()
[docs] async def close(self): """Close aiohttp.ClientSession. Returns: None """ if self.__session: await self.__session.close() self.__session = None self.__cleaning_installed = False
[docs] async def init_app(self, app: Union[web.Application, 'ApplicationWorker']): """ Initialize application api_request method. Args: app (object): aiohttp.web.Application instance. """ await self.init() app.api_request = self
[docs] async def close_app(self, *args, **kwargs): """ Close aiohttp.ClientSession. Returns: None """ await self.close()
[docs] async def __aenter__(self): """Method to get aiohttp.ClientSession. """ await self.init() return self
[docs] async def __aexit__(self, exc_type, exc_val, exc_tb): """Method to close aiohttp.ClientSession. """ await self.close()
#: Requests: singleton instance for Requests Wrapper async_api_request = Requests()
[docs]def setup_async_api_requests(app: Union[web.Application, 'ApplicationWorker']) -> None: """ Append Requests wrapper to app. Args: app (object): aiohttp.web.Application instance. """ app.on_startup.append(async_api_request.init_app) app.on_shutdown.append(async_api_request.close_app)