Source code for prozorro_sale.tools.application

import asyncio
import signal
import sys
import traceback
from typing import (
    Awaitable,
    Optional,
    Type,
    Union,
)

try:
    from uvloop import EventLoopPolicy
except ImportError:
    EventLoopPolicy = asyncio.DefaultEventLoopPolicy
from aiohttp import web

from prozorro_sale.tools.logger import get_custom_logger

LOG = get_custom_logger(__name__)


[docs]class CoroutineWrapper: """Coroutine wrapper. """ __slots__ = ('loop', 'task', '_stop_callback', '_shutdown', '_graceful_exit_time')
[docs] def __init__(self, coroutine, loop: asyncio.AbstractEventLoop, stop_callback=None, graceful_exit_time=False): """ Args: coroutine: loop (asyncio.AbstractEventLoop`): current Event loop stop_callback: graceful_exit_time: Attributes: task (asyncio.Task): current application task loop (asyncio.AbstractEventLoop`): current Event loop """ self.loop = loop self.task = self.loop.create_task(coroutine, name=f'{coroutine.__name__}') self._stop_callback = stop_callback self._graceful_exit_time = graceful_exit_time self._shutdown = False if self._stop_callback is not None and not self._graceful_exit_time: self._graceful_exit_time = True
[docs] def initialize(self): """Create asynchronous task. """ LOG.info(f'Initialize {self}') return self.task
[docs] def _cancel(self): """Cancel asynchronous task. """ try: if not self.task.done(): LOG.info(f'Cancel {self}') self.task.cancel() else: if exception := self.task.exception(): trace = "".join(traceback.format_exception( etype=type(exception), value=exception, tb=exception.__traceback__ )) LOG.error(f'Exception in task {self.task.get_name()}\n' f'{trace}') except asyncio.CancelledError: ...
[docs] async def _cleanup(self): ...
[docs] def shutdown(self): """Cleanup asynchronous loop. """ if self._shutdown: return self._shutdown = True if self._stop_callback is not None: self._stop_callback() self.loop.create_task(self._cleanup()) self.loop.call_later(int(self._graceful_exit_time), self._cancel) LOG.info(f'Shutting down {self}')
[docs] def __repr__(self): return f'{self.__class__.__name__} for {self.task.get_name()}'
[docs] def __str__(self): return self.__repr__()
[docs]class WorkerWrapper(CoroutineWrapper): ...
[docs]class AioHttpWrapper(CoroutineWrapper): """Wrapper for starting the application asynchronously or serving on multiple HOST/PORT. Example: >>> runner = web.AppRunner(app) ... await runner.setup() ... site = web.TCPSite(runner, 'localhost', 8080) ... await site.start() """ __slots__ = ( 'aiohttp_wrapper_args', 'runner' )
[docs] def __init__(self, aiohttp_wrapper_args: dict, loop: asyncio.AbstractEventLoop, stop_callback=None, graceful_exit_time=False ): """ Initialize web.AppRunner Attributes: loop (asyncio.AbstractEventLoop): current Event loop runner (web.AppRunner): aiohttp web app runner _shutdown (bool): application stop flag """ self.aiohttp_wrapper_args = aiohttp_wrapper_args self.loop = loop self._shutdown = False self.runner = web.AppRunner( app=self.aiohttp_wrapper_args["app"], access_log_class=self.aiohttp_wrapper_args["access_log_class"], access_log_format=web.AccessLogger.LOG_FORMAT, access_log=web.access_logger, keepalive_timeout=self.aiohttp_wrapper_args["keepalive_timeout"], ) coroutine = self.__sleep_stream() coroutine.__name__ = f'Application Server ' \ f'{self.aiohttp_wrapper_args["host"]}:{self.aiohttp_wrapper_args["port"]}' super().__init__(coroutine, loop, stop_callback, graceful_exit_time)
async def __sleep_stream(self): """Creates HTTP server """ await self.runner.setup() site = web.TCPSite(runner=self.runner, host=self.aiohttp_wrapper_args["host"], port=self.aiohttp_wrapper_args["port"], shutdown_timeout=self.aiohttp_wrapper_args["shutdown_timeout"], ssl_context=self.aiohttp_wrapper_args["ssl_context"], backlog=self.aiohttp_wrapper_args["backlog"], reuse_address=self.aiohttp_wrapper_args["reuse_address"], reuse_port=self.aiohttp_wrapper_args["reuse_port"]) await site.start() while not self._shutdown: await asyncio.sleep(3600)
[docs] async def _cleanup(self): """Cleanup web.AppRunner. """ await self.runner.cleanup()
[docs]class ApplicationWrapper: """Application Wrapper allows to create and configure app with preset configuration, and also has custom methods. Custom methods: - add_web_app (add custom aiohttp web application) - add_coroutine (add task to asyncio.loop) """ #: str: condition to exit after the first error occurs FIRST_EXCEPTION = asyncio.FIRST_EXCEPTION #: str: condition to exit after the first coroutines complete FIRST_COMPLETED = asyncio.FIRST_COMPLETED #: str: condition to exit after all coroutines complete ALL_COMPLETED = asyncio.ALL_COMPLETED __slots__ = ('loop', 'apps', 'return_when', 'runner', 'signals')
[docs] def __init__(self, loop: asyncio.AbstractEventLoop = None, return_when: Optional[str] = FIRST_EXCEPTION, signals: Optional[Union[list, set]] = None, debug: Optional[bool] = None ): """ Application wrapper Args: loop (:obj:`asyncio.AbstractEventLoop`, optional): Event loop return_when (:obj:`str`, optional): wrapper stop condition, default FIRST_EXCEPTION signals (:obj:`list`, optional): UNIX signals to stop debug (:obj:`bool`, optional): run loop on debug mode Attributes: runner (asyncio.Task): current wrapper task loop (asyncio.AbstractEventLoop`): current Event loop return_when (str): exit condition apps (list): list of registered applications Returns: None """ self.runner = None self.loop = loop self.return_when = return_when self.apps = [] if self.loop is None: asyncio.set_event_loop_policy(EventLoopPolicy()) self.loop = asyncio.get_event_loop() if debug is not None: self.loop.set_debug(debug) if sys.platform == 'win32': self.signals = [] else: self.signals = {signal.SIGTERM, signal.SIGHUP, signal.SIGINT} if isinstance(signals, (list, set)): self.signals = signals
[docs] def add_web_app(self, app: Union[web.Application, Awaitable[web.Application]], *, host: Optional[Union[str, web.HostSequence]] = '0.0.0.0', port: Optional[int] = 80, shutdown_timeout: Optional[float] = 60.0, keepalive_timeout: Optional[float] = 75.0, ssl_context: Optional[web.SSLContext] = None, backlog: Optional[int] = 128, access_log_class: Type[web.AbstractAccessLogger] = web.AccessLogger, reuse_address: Optional[bool] = True, reuse_port: Optional[bool] = True, stop_callback: Optional[bool] = None, graceful_exit_time: Optional[bool] = False ) -> None: """ Method for add custom aiohttp web application. Args: app (:obj:`web.Application`): aiohttp Application host (:obj:`str`, optional): listen ip address, default '0.0.0.0' port (:obj:`int`, optional): listen tcp port, default 80 shutdown_timeout (:obj:`float`, optional): keepalive_timeout (:obj:`float`, optional): ssl_context (:obj:`web.SSLContext`, optional): backlog (:obj:`int`, optional): access_log_class (:obj:`web.AbstractAccessLogger`, optional): reuse_address (:obj:`bool`, optional): reuse_port (:obj:`bool`, optional): stop_callback (:obj:`bool`, optional): graceful_exit_time (:obj:`bool`, optional): """ aiohttp_wrapper_args = { "app": app, "host": host or '0.0.0.0', "port": port or 80, "shutdown_timeout": shutdown_timeout or 60.0, "keepalive_timeout": keepalive_timeout or 75.0, "ssl_context": ssl_context, "backlog": backlog or 128, "access_log_class": access_log_class, "reuse_address": reuse_address, "reuse_port": reuse_port } self.apps.append( AioHttpWrapper(aiohttp_wrapper_args, self.loop, stop_callback, graceful_exit_time) )
[docs] def add_coroutine(self, coro, stop_callback=None, graceful_exit_time=False): """ Method for add task to asyncio.loop Args: coro: stop_callback: graceful_exit_time: """ self.apps.append( CoroutineWrapper(coro, self.loop, stop_callback, graceful_exit_time) )
[docs] def add_worker_app(self, app, stop_callback=None, graceful_exit_time=False): stop_callback = stop_callback or getattr(app, 'stop', None) self.apps.append( WorkerWrapper(app(), self.loop, stop_callback, graceful_exit_time) )
def __handle_signal(self, _signal_name, callback): """ Set callback as the handler for the _signal_name signal. The callback will be invoked by loop, along with other queued callbacks and runnable coroutines of that event loop. Returns: None """ try: self.loop.add_signal_handler(_signal_name, callback) signal.signal(_signal_name, callback) except NotImplementedError: # add_signal_handler is not implemented on Windows pass def __shutdown_app(self): """Method for shutdown all added apps. """ for app in self.apps: app.shutdown() def __shutdown_runner(self): """Method for shutdown web.AppRunner. """ LOG.info('Cancel runner') if not self.loop.is_closed(): if self.runner and not self.runner.done(): self.runner.cancel() self.__cancel_tasks() def __cancel_tasks(self) -> None: """Method for cancel all active tasks. """ LOG.info('Cancel all_tasks') to_cancel = asyncio.all_tasks(self.loop) if not to_cancel: return for task in to_cancel: if not task.done(): task.cancel() self.loop.create_task(self.__wait(to_cancel, self.ALL_COMPLETED, canceled_pending=True))
[docs] def _graceful_exit(self, *args, **kwargs) -> None: """Method for graceful close aiohttp.loop. if loop is running, wait until complete. """ self.__shutdown_app() if not self.loop.is_running(): self.loop.run_until_complete(asyncio.sleep(1))
def __close_loop(self): """Close aiohttp.loop. """ self.loop.run_until_complete(self.loop.shutdown_asyncgens()) self.loop.run_until_complete(self.loop.shutdown_default_executor()) self.loop.stop() self.loop.close() asyncio.set_event_loop(None) async def __wait(self, tasks, return_when=None, canceled_pending=False): """ Wrapper for asyncio.wait. Args: tasks (list): coroutine tasks return_when : asyncio.FIRST_EXCEPTION, asyncio.FIRST_COMPLETED, asyncio.ALL_COMPLETED canceled_pending (bool): If True cancel pending tasks """ if return_when is None: return_when = self.return_when done, pending = await asyncio.wait(tasks, return_when=return_when) for app in self.apps: app.shutdown() for task in done: if task.cancelled(): continue try: task.result() except Exception: ... if canceled_pending: for task in pending: task.cancel()
[docs] def __call__(self): """Starts configurated ApplicationWrapper """ for sign in self.signals: self.__handle_signal(sign, self._graceful_exit) try: LOG.info('Start serving') self.runner = self.loop.create_task(self.__wait([app.initialize() for app in self.apps]), name='ApplicationWrapper run_all') self.loop.run_until_complete(self.runner) except (KeyboardInterrupt, web.GracefulExit, SystemExit, Exception): self.__shutdown_app() finally: self._graceful_exit() self.__shutdown_runner() self.__close_loop() LOG.info('Stop serving')