Source code for prozorro_sale.tools.application_worker

import asyncio
import signal
from contextlib import asynccontextmanager
from functools import partial
from inspect import getfullargspec
from typing import Any, Dict, Iterator, Coroutine

try:
    from uvloop import EventLoopPolicy
except ImportError:
    EventLoopPolicy = asyncio.DefaultEventLoopPolicy

from aiohttp.web_app import CleanupContext
from aiohttp.web_runner import GracefulExit
from aiosignal import Signal
from prozorro_sale.tools.logger import get_custom_logger

__all__ = ("ApplicationWorker",)
LOG = get_custom_logger(__name__)


[docs]class ApplicationWorkerTasksErrors(Exception): ...
[docs]class ApplicationWorker: __slots__ = ('__handle_signals', '__state', '__frozen', '__pre_frozen', '__on_startup', '__on_shutdown', '__on_cleanup', '__cleanup_ctx', '__process', '__running', '__tasks', '__loop')
[docs] def __init__(self, handle_signals: bool = False, loop: asyncio.AbstractEventLoop = None, ) -> None: self.__loop = loop self.__handle_signals = handle_signals self.__state: Dict[str, Any] = {} self.__frozen = False self.__pre_frozen = False self.__running = None self.__process: list = [] self.__tasks: list = [] self.__on_startup: Signal = Signal(self) self.__on_shutdown: Signal = Signal(self) self.__on_cleanup: Signal = Signal(self) self.__cleanup_ctx = CleanupContext() self.__on_startup.append(self.__cleanup_ctx._on_startup) self.__on_cleanup.append(self.__cleanup_ctx._on_cleanup)
@property def loop(self): if not self.__loop: self.__loop = asyncio.get_event_loop() return self.__loop
[docs] def __eq__(self, other: object) -> bool: return self is other
[docs] def __getitem__(self, key: str) -> Any: return self.__state[key]
[docs] def __setitem__(self, key: str, value: Any) -> None: self.__state[key] = value
[docs] def __delitem__(self, key: str) -> None: del self.__state[key]
[docs] def __len__(self) -> int: return len(self.__state)
[docs] def __iter__(self) -> Iterator[str]: return iter(self.__state)
[docs] def __setattr__(self, key: str, value: Any) -> None: if key.replace(f'_{self.__class__.__name__}', '') in self.__class__.__slots__: super().__setattr__(key, value) else: self.__state[key] = value
[docs] def __getattr__(self, name): try: return self.__state[name] except KeyError: cls = self.__class__.__name__ raise AttributeError(f"'{cls}' object has no attribute '{name}'")
@property def pre_frozen(self) -> bool: return self.__pre_frozen
[docs] def pre_freeze(self) -> None: if self.__pre_frozen: return self.__pre_frozen = True self.__cleanup_ctx.freeze() self.__on_startup.freeze() self.__on_shutdown.freeze() self.__on_cleanup.freeze()
@property def running(self): return bool(self.__running) @property def frozen(self) -> bool: return self.__frozen
[docs] def freeze(self) -> None: if self.__frozen: return self.pre_freeze() self.__frozen = True
[docs] def add_process(self, process): if not asyncio.iscoroutinefunction(process): raise TypeError('The process should be async function or async generator function') spec = getfullargspec(process) if len(spec.args) != 1: raise TypeError(f'{process.__name__} takes {len(spec.args)} positional arguments, ' f'but must take one app argument') coro = process(self) self.__process.append(coro) process_names = ",".join([getattr(coro, '__name__', str(coro)) for coro in self.__process]) self.run.__func__.__name__ = f'ApplicationWorker with {process_names}'
@property def on_startup(self) -> Signal: return self.__on_startup @property def on_shutdown(self) -> Signal: return self.__on_shutdown @property def on_cleanup(self) -> Signal: return self.__on_cleanup @property def cleanup_ctx(self) -> "CleanupContext": return self.__cleanup_ctx
[docs] async def startup(self) -> None: """Causes on_startup signal Should be called in the event loop along with the request handler. """ await self.on_startup.send(self)
[docs] async def shutdown(self) -> None: """Causes on_shutdown signal Should be called before cleanup() """ await self.on_shutdown.send(self)
[docs] async def cleanup(self) -> None: """Causes on_cleanup signal Should be called after shutdown() """ if self.on_cleanup.frozen: await self.on_cleanup.send(self) else: # If an exception occurs in startup, ensure cleanup contexts are completed. await self.__cleanup_ctx._on_cleanup(self)
[docs] def __repr__(self) -> str: return f"<ApplicationWorker 0x{id(self):x}>"
[docs] def stop(self, *args, with_raise=False, **kwargs): self.__running = False for task in self.__tasks: task.cancel() if with_raise: raise GracefulExit(f'Stoping ApplicationWorker for {",".join([coro.__name__ for coro in self.__process])}')
async def __make_worker(self): if self.__handle_signals: try: self.loop.add_signal_handler(signal.SIGINT, partial(self.stop, with_raise=True)) self.loop.add_signal_handler(signal.SIGTERM, partial(self.stop, with_raise=True)) except NotImplementedError: # pragma: no cover # add_signal_handler is not implemented on Windows pass self.on_startup.freeze() await self.startup() self.freeze() async def __destroy_woker(self): if self.__handle_signals: try: self.loop.remove_signal_handler(signal.SIGINT) self.loop.remove_signal_handler(signal.SIGTERM) except NotImplementedError: # pragma: no cover # remove_signal_handler is not implemented on Windows pass for task in self.__tasks: if task.cancelled(): continue elif task.done(): try: task.result() except asyncio.CancelledError: ... except Exception as e: LOG.exception(f"Exception in ApplicationWorker task {task.get_name()} - {e}", exc_info=True) else: task.cancel() await self.shutdown() await self.cleanup() if not self.loop.is_running(): self.loop.run_until_complete(asyncio.sleep(1)) @asynccontextmanager async def __task_runer(self): await self.__make_worker() try: self.__running = True self.__tasks = [self.loop.create_task(coro, name=f'{coro.__name__}') for coro in self.__process] yield asyncio.wait(self.__tasks, return_when=asyncio.FIRST_EXCEPTION) except (KeyboardInterrupt, GracefulExit, SystemExit, asyncio.CancelledError): # pragma: no cover ... except ApplicationWorkerTasksErrors: raise except Exception as e: LOG.exception(f"Exception in ApplicationWorker - {e}", exc_info=True) raise finally: self.__running = False await self.__destroy_woker()
[docs] async def run(self): async with self.__task_runer() as tasks: error_stake = {} done, pending = await tasks for task in pending: task.cancel() for task in done: if task.cancelled(): continue try: task.result() except asyncio.CancelledError: ... except Exception as e: error_stake[task.get_name()] = e LOG.exception(f"Exception in ApplicationWorker task {task.get_name()} - {e}", exc_info=True) if error_stake: raise ApplicationWorkerTasksErrors(error_stake)
[docs] def __call__(self) -> "Coroutine": """gunicorn compatibility""" return self.run()