"""
Package to provide a middleware for catch error Exception.
"""
import functools
from asyncio import iscoroutinefunction, CancelledError
from typing import Coroutine, Union
from aiohttp import web
from aiohttp.web_exceptions import HTTPRedirection, HTTPSuccessful, HTTPClientError
# TODO Temporary for compatibility and migrate aiohttp 3.8.x to 3.9.x
try:
from aiohttp.web_middlewares import _Middleware
except ImportError:
from aiohttp.typedefs import Middleware as _Middleware
from prozorro_sale.tools.context_variables import REQUEST_ID
from prozorro_sale.tools.logger import get_custom_logger
from prozorro_sale.tools.utils import is_resp_start_send, close_not_complited_response
__all__ = ['expects', 'catch_error_middleware']
LOG = get_custom_logger(__name__)
[docs]async def aiohttp_change_response(msg, status_code, request=None):
"""Method for change response if error was occurred.
Args:
msg: error message
status_code: response status code
request (object): Request
Returns:
aiohttp.web.Response: Response object.
"""
return web.json_response({'message': msg}, status=status_code, headers={'X-Request-ID': REQUEST_ID.get('unknown')})
[docs]def expects(errors: dict = None, response_constructor: Union[Coroutine, None] = None, catch_all: bool = False):
"""
Decorator to handle unique exceptions from handlers.
Args:
errors (dict): Custom application exception dictionary
response_constructor (coroutine): Coroutine for make Response
catch_all (bool): catch all exceptions
Example:
>>> @expects({
... SomeException: 404,
... SomeAnotherException: (403, 'Forbidden. {}')
... })
... def handler(request):
... pass
"""
if not isinstance(errors, dict):
errors = dict()
for error, description in errors.items():
if isinstance(description, int):
errors[error] = (description, '{}')
if not iscoroutinefunction(response_constructor):
response_constructor = aiohttp_change_response
def wrapper(func):
@functools.wraps(func)
async def handler(request, *args, **kwargs):
try:
try:
res = await func(request, *args, **kwargs)
if res is None:
raise Exception('Missing return statement on request handler')
else:
return res
except ConnectionResetError as cex:
LOG.info(f'CBD client go away - {cex}')
raise CancelledError
except Exception as ex:
if not is_resp_start_send(request):
raise
else:
try:
await close_not_complited_response(request)
except ConnectionResetError as cex:
LOG.info(f'CBD client go away - {cex}')
else:
if getattr(ex, 'status_code', None) is not None:
LOG.info(f'Not complited response - {ex}')
else:
code, message = errors.get(type(ex), (None, None))
if code:
LOG.info(message.format(ex))
else:
LOG.exception(f'Unknown error caught in API - {ex}')
finally:
raise CancelledError
except tuple(errors.keys()) as ex:
code, message = errors.get(type(ex), (None, None))
if code:
msg = message.format(ex)
LOG.info(msg)
return await response_constructor(msg, code, request)
if not catch_all:
raise
if hasattr(ex, '__http_exception__'):
return await response_constructor(ex.text, ex.status_code, request)
LOG.exception(f'Unknown error caught in API - {ex}')
return await response_constructor('Internal server error', 500, request)
except HTTPRedirection:
raise
except (HTTPSuccessful, HTTPClientError) as ex:
if not catch_all:
raise
return await response_constructor(ex.text, ex.status_code, request)
except Exception as ex:
if not catch_all:
raise
LOG.exception(f'Unknown error caught in API - {ex}')
return await response_constructor('Internal server error', 500, request)
return handler
return wrapper
[docs]def catch_error_middleware(errors: dict = None, response_constructor: Union[Coroutine, None] = None) -> _Middleware:
"""Middleware to handle unique exceptions from handlers.
Args:
errors (:obj:`dict`, optional): Custom application exception dictionary
response_constructor (:obj:`coroutine`, optional): Coroutine for make Response
Returns:
aiohttp Middleware
Example:
>>> from prozorro_sale.tools.errors import catch_error_middleware
...
...
... ERRORS = {
... KeyError: 400,
... AttributeError: 200,
... ValueError: (403, 'Forbidden. {}'),
... Exception: (500, '{}')
... }
...
... async def response_constructor(msg, status_code, request=None):
... return web.json_response({'message': msg}, status=status_code)
...
... app = web.Application(middlewares=[catch_error_middleware(ERRORS, response_constructor),])
"""
@web.middleware
@expects(errors, response_constructor, True)
async def _catch_error_middleware(request, handler, *args, **kwargs):
return await handler(request, *args, **kwargs)
return _catch_error_middleware