import asyncio
import signal
from contextlib import asynccontextmanager
from functools import partial
from inspect import getfullargspec
from typing import Any, Dict, Iterator, Coroutine
try:
from uvloop import EventLoopPolicy
except ImportError:
EventLoopPolicy = asyncio.DefaultEventLoopPolicy
from aiohttp.web_app import CleanupContext
from aiohttp.web_runner import GracefulExit
from aiosignal import Signal
from prozorro_sale.tools.logger import get_custom_logger
__all__ = ("ApplicationWorker",)
LOG = get_custom_logger(__name__)
[docs]class ApplicationWorkerTasksErrors(Exception):
...
[docs]class ApplicationWorker:
__slots__ = ('__handle_signals', '__state', '__frozen', '__pre_frozen', '__on_startup', '__on_shutdown',
'__on_cleanup', '__cleanup_ctx', '__process', '__running', '__tasks', '__loop')
[docs] def __init__(self, handle_signals: bool = False, loop: asyncio.AbstractEventLoop = None, ) -> None:
self.__loop = loop
self.__handle_signals = handle_signals
self.__state: Dict[str, Any] = {}
self.__frozen = False
self.__pre_frozen = False
self.__running = None
self.__process: list = []
self.__tasks: list = []
self.__on_startup: Signal = Signal(self)
self.__on_shutdown: Signal = Signal(self)
self.__on_cleanup: Signal = Signal(self)
self.__cleanup_ctx = CleanupContext()
self.__on_startup.append(self.__cleanup_ctx._on_startup)
self.__on_cleanup.append(self.__cleanup_ctx._on_cleanup)
@property
def loop(self):
if not self.__loop:
self.__loop = asyncio.get_event_loop()
return self.__loop
[docs] def __eq__(self, other: object) -> bool:
return self is other
[docs] def __getitem__(self, key: str) -> Any:
return self.__state[key]
[docs] def __setitem__(self, key: str, value: Any) -> None:
self.__state[key] = value
[docs] def __delitem__(self, key: str) -> None:
del self.__state[key]
[docs] def __len__(self) -> int:
return len(self.__state)
[docs] def __iter__(self) -> Iterator[str]:
return iter(self.__state)
[docs] def __setattr__(self, key: str, value: Any) -> None:
if key.replace(f'_{self.__class__.__name__}', '') in self.__class__.__slots__:
super().__setattr__(key, value)
else:
self.__state[key] = value
[docs] def __getattr__(self, name):
try:
return self.__state[name]
except KeyError:
cls = self.__class__.__name__
raise AttributeError(f"'{cls}' object has no attribute '{name}'")
@property
def pre_frozen(self) -> bool:
return self.__pre_frozen
[docs] def pre_freeze(self) -> None:
if self.__pre_frozen:
return
self.__pre_frozen = True
self.__cleanup_ctx.freeze()
self.__on_startup.freeze()
self.__on_shutdown.freeze()
self.__on_cleanup.freeze()
@property
def running(self):
return bool(self.__running)
@property
def frozen(self) -> bool:
return self.__frozen
[docs] def freeze(self) -> None:
if self.__frozen:
return
self.pre_freeze()
self.__frozen = True
[docs] def add_process(self, process):
if not asyncio.iscoroutinefunction(process):
raise TypeError('The process should be async function or async generator function')
spec = getfullargspec(process)
if len(spec.args) != 1:
raise TypeError(f'{process.__name__} takes {len(spec.args)} positional arguments, '
f'but must take one app argument')
coro = process(self)
self.__process.append(coro)
process_names = ",".join([getattr(coro, '__name__', str(coro)) for coro in self.__process])
self.run.__func__.__name__ = f'ApplicationWorker with {process_names}'
@property
def on_startup(self) -> Signal:
return self.__on_startup
@property
def on_shutdown(self) -> Signal:
return self.__on_shutdown
@property
def on_cleanup(self) -> Signal:
return self.__on_cleanup
@property
def cleanup_ctx(self) -> "CleanupContext":
return self.__cleanup_ctx
[docs] async def startup(self) -> None:
"""Causes on_startup signal
Should be called in the event loop along with the request handler.
"""
await self.on_startup.send(self)
[docs] async def shutdown(self) -> None:
"""Causes on_shutdown signal
Should be called before cleanup()
"""
await self.on_shutdown.send(self)
[docs] async def cleanup(self) -> None:
"""Causes on_cleanup signal
Should be called after shutdown()
"""
if self.on_cleanup.frozen:
await self.on_cleanup.send(self)
else:
# If an exception occurs in startup, ensure cleanup contexts are completed.
await self.__cleanup_ctx._on_cleanup(self)
[docs] def __repr__(self) -> str:
return f"<ApplicationWorker 0x{id(self):x}>"
[docs] def stop(self, *args, with_raise=False, **kwargs):
self.__running = False
for task in self.__tasks:
task.cancel()
if with_raise:
raise GracefulExit(f'Stoping ApplicationWorker for {",".join([coro.__name__ for coro in self.__process])}')
async def __make_worker(self):
if self.__handle_signals:
try:
self.loop.add_signal_handler(signal.SIGINT, partial(self.stop, with_raise=True))
self.loop.add_signal_handler(signal.SIGTERM, partial(self.stop, with_raise=True))
except NotImplementedError: # pragma: no cover
# add_signal_handler is not implemented on Windows
pass
self.on_startup.freeze()
await self.startup()
self.freeze()
async def __destroy_woker(self):
if self.__handle_signals:
try:
self.loop.remove_signal_handler(signal.SIGINT)
self.loop.remove_signal_handler(signal.SIGTERM)
except NotImplementedError: # pragma: no cover
# remove_signal_handler is not implemented on Windows
pass
for task in self.__tasks:
if task.cancelled():
continue
elif task.done():
try:
task.result()
except asyncio.CancelledError:
...
except Exception as e:
LOG.exception(f"Exception in ApplicationWorker task {task.get_name()} - {e}", exc_info=True)
else:
task.cancel()
await self.shutdown()
await self.cleanup()
if not self.loop.is_running():
self.loop.run_until_complete(asyncio.sleep(1))
@asynccontextmanager
async def __task_runer(self):
await self.__make_worker()
try:
self.__running = True
self.__tasks = [self.loop.create_task(coro, name=f'{coro.__name__}') for coro in self.__process]
yield asyncio.wait(self.__tasks, return_when=asyncio.FIRST_EXCEPTION)
except (KeyboardInterrupt, GracefulExit, SystemExit, asyncio.CancelledError): # pragma: no cover
...
except ApplicationWorkerTasksErrors:
raise
except Exception as e:
LOG.exception(f"Exception in ApplicationWorker - {e}", exc_info=True)
raise
finally:
self.__running = False
await self.__destroy_woker()
[docs] async def run(self):
async with self.__task_runer() as tasks:
error_stake = {}
done, pending = await tasks
for task in pending:
task.cancel()
for task in done:
if task.cancelled():
continue
try:
task.result()
except asyncio.CancelledError:
...
except Exception as e:
error_stake[task.get_name()] = e
LOG.exception(f"Exception in ApplicationWorker task {task.get_name()} - {e}", exc_info=True)
if error_stake:
raise ApplicationWorkerTasksErrors(error_stake)
[docs] def __call__(self) -> "Coroutine":
"""gunicorn compatibility"""
return self.run()