import functools
from typing import Union, TYPE_CHECKING
import ujson
from aiohttp import web, hdrs
from aiohttp.client import ClientSession, _RequestContextManager, TCPConnector
from prozorro_sale.tools.context_variables import REQUEST_ID
from prozorro_sale.tools.logger import get_custom_logger
if TYPE_CHECKING:
# type hinting without cyclic imports
from prozorro_sale.tools.application_worker import ApplicationWorker
__all__ = ['async_api_request', 'setup_async_api_requests']
LOG = get_custom_logger(__name__)
[docs]class DefaultResponse(BaseException):
"""Wrapper for Response
"""
__default_response__ = True
[docs] def __init__(self, resp):
self.resp = resp
[docs]class Requests:
"""Thin wrapper for aiohttp.ClientSession with Requests simplicity.
"""
[docs] def __init__(self, *args, **kwargs):
self.__cleaning_installed = False
self.__session_args = (args, kwargs)
self.__session = None
@property
def session(self):
"""An instance of aiohttp.ClientSession.
"""
return self._session()
[docs] def _session(self):
"""Initialize session config for aiohttp.ClientSession instance.
"""
if 'json_serialize' not in self.__session_args[1]:
self.__session_args[1]['json_serialize'] = ujson.dumps
if 'connector' not in self.__session_args[1] or self.__session_args[1]['connector'].closed:
force_close = not self.__cleaning_installed
enable_cleanup_closed = not self.__cleaning_installed
limit = 100 if self.__cleaning_installed else 1
self.__session_args[1]['connector'] = TCPConnector(
force_close=force_close,
enable_cleanup_closed=enable_cleanup_closed,
limit=limit
)
if not self.__cleaning_installed:
LOG.warning('The automatic hook for closing the connection is not installed. '
'Please close the session manually')
if self.closed or self.connector_closed:
self.__session = ClientSession(*self.__session_args[0], **self.__session_args[1])
return self.__session
[docs] @staticmethod
def _check_code(status_code, codes):
"""
Validate response status_code. If not valid returns True.
Returns:
bool: if response status code is not valid returns True
"""
if isinstance(codes, int):
codes = (codes, codes)
if isinstance(codes, tuple) and len(codes) == 2:
if any(code is ... for code in codes):
return codes[1] >= status_code if codes[0] is ... else codes[0] <= status_code
codes = sorted(codes)
return codes[0] <= status_code <= codes[1]
return False
[docs] def _check_resp_status(self, status_code, error_codes, default_resp):
"""
Check response status code. May raise exception with default message or sent error_message.
Args:
status_code (int): current response status code
error_codes (dict): data if error response
default_resp (dict): default data if error response
Raises:
DefaultResponse
CustomError
"""
if default_resp:
for codes, resp in default_resp.items():
if self._check_code(status_code, codes):
raise DefaultResponse(resp)
if error_codes:
for ex, codes in error_codes.items():
if isinstance(ex, type) and issubclass(ex, BaseException):
ex = (ex, '')
cases = [
self._check_code(status_code, codes),
isinstance(ex, tuple),
len(ex) == 2,
]
if all(cases):
raise ex[0](ex[1].format(status_code))
[docs] def __getattr__(self, attr):
if attr.upper() in hdrs.METH_ALL:
@functools.wraps(self.session._request)
async def session_request(*args, **kwargs):
"""
Constructs and sends a request. Returns response object.
Args:
url - request url
params - (optional) Dictionary or bytes to be sent in the query
string of the new request
data - (optional) Dictionary, bytes, or file-like object to
send in the body of the request
json - (optional) Any json compatible python object
headers - (optional) Dictionary of HTTP Headers to send with
the request
cookies - (optional) Dict object to send with the request
auth - (optional) BasicAuth named tuple represent HTTP Basic Auth
auth - aiohttp.helpers.BasicAuth
allow_redirects - (optional) If set to False, do not follow redirects
version - Request HTTP version.
compress - Set to True if request has to be compressed
with deflate encoding.
chunked - Set to chunk size for chunked transfer encoding.
expect100 - Expect 100-continue response from server.
connector - BaseConnector sub-class instance to support
connection pooling.
read_until_eof - Read response until eof if response
does not have Content-Length header.
loop - Optional event loop.
timeout - Optional ClientTimeout settings structure, 5min
total timeout by default.
error_codes - Optional, {
SomeException: 404, SomeAnotherException: (403, 400)
}. ... is used to indicate a code range from the start or to the end.
default_response = Optional, {404: {}, (403, 400): {}}.
Example::
>>> from prozorro_sale.tools.api_requests import async_api_request
... data = await async_api_request.get('https://procedure.prozorro.sale/api')
"""
error_codes = kwargs.pop('error_codes', {})
default_resp = kwargs.pop('default_response', {})
headers = kwargs.pop('headers', {})
headers['X-Request-ID'] = REQUEST_ID.get('unknown')
data = {}
async with await _RequestContextManager(self.session._request(
attr.upper(),
*args,
headers=headers,
**kwargs
)) as resp:
try:
self._check_resp_status(resp.status, error_codes, default_resp)
except DefaultResponse as ex:
if ex.__default_response__:
data = ex.resp
else:
data = await resp.json()
return data
return session_request
else:
return super().__getattribute__(attr)
@property
def closed(self):
"""
Check if aiohttp.ClientSession is closed.
Returns:
bool
"""
if self.__session:
cases = [
self.__session.closed,
self.__session._loop.is_closed()
]
return any(cases)
return True
@property
def connector_closed(self):
"""
Check if aiohttp.ClientSession connector is closed
Returns:
bool
"""
if self.__session:
cases = [
self.__session._connector is None,
self.__session._connector and self.__session._connector.closed
]
return any(cases)
return True
[docs] async def init(self):
"""
Initialize aiohttp.ClientSession.
Returns:
None
"""
self.__cleaning_installed = True
self._session()
[docs] async def close(self):
"""Close aiohttp.ClientSession.
Returns:
None
"""
if self.__session:
await self.__session.close()
self.__session = None
self.__cleaning_installed = False
[docs] async def init_app(self, app: Union[web.Application, 'ApplicationWorker']):
"""
Initialize application api_request method.
Args:
app (object): aiohttp.web.Application instance.
"""
await self.init()
app.api_request = self
[docs] async def close_app(self, *args, **kwargs):
"""
Close aiohttp.ClientSession.
Returns:
None
"""
await self.close()
[docs] async def __aenter__(self):
"""Method to get aiohttp.ClientSession.
"""
await self.init()
return self
[docs] async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Method to close aiohttp.ClientSession.
"""
await self.close()
#: Requests: singleton instance for Requests Wrapper
async_api_request = Requests()
[docs]def setup_async_api_requests(app: Union[web.Application, 'ApplicationWorker']) -> None:
"""
Append Requests wrapper to app.
Args:
app (object): aiohttp.web.Application instance.
"""
app.on_startup.append(async_api_request.init_app)
app.on_shutdown.append(async_api_request.close_app)