import asyncio
import signal
import sys
import traceback
from typing import (
Awaitable,
Optional,
Type,
Union,
)
try:
from uvloop import EventLoopPolicy
except ImportError:
EventLoopPolicy = asyncio.DefaultEventLoopPolicy
from aiohttp import web
from prozorro_sale.tools.logger import get_custom_logger
LOG = get_custom_logger(__name__)
[docs]class CoroutineWrapper:
"""Coroutine wrapper.
"""
__slots__ = ('loop', 'task', '_stop_callback', '_shutdown', '_graceful_exit_time')
[docs] def __init__(self, coroutine, loop: asyncio.AbstractEventLoop, stop_callback=None, graceful_exit_time=False):
"""
Args:
coroutine:
loop (asyncio.AbstractEventLoop`): current Event loop
stop_callback:
graceful_exit_time:
Attributes:
task (asyncio.Task): current application task
loop (asyncio.AbstractEventLoop`): current Event loop
"""
self.loop = loop
self.task = self.loop.create_task(coroutine, name=f'{coroutine.__name__}')
self._stop_callback = stop_callback
self._graceful_exit_time = graceful_exit_time
self._shutdown = False
if self._stop_callback is not None and not self._graceful_exit_time:
self._graceful_exit_time = True
[docs] def initialize(self):
"""Create asynchronous task.
"""
LOG.info(f'Initialize {self}')
return self.task
[docs] def _cancel(self):
"""Cancel asynchronous task.
"""
try:
if not self.task.done():
LOG.info(f'Cancel {self}')
self.task.cancel()
else:
if exception := self.task.exception():
trace = "".join(traceback.format_exception(
etype=type(exception), value=exception, tb=exception.__traceback__
))
LOG.error(f'Exception in task {self.task.get_name()}\n'
f'{trace}')
except asyncio.CancelledError:
...
[docs] async def _cleanup(self):
...
[docs] def shutdown(self):
"""Cleanup asynchronous loop.
"""
if self._shutdown:
return
self._shutdown = True
if self._stop_callback is not None:
self._stop_callback()
self.loop.create_task(self._cleanup())
self.loop.call_later(int(self._graceful_exit_time), self._cancel)
LOG.info(f'Shutting down {self}')
[docs] def __repr__(self):
return f'{self.__class__.__name__} for {self.task.get_name()}'
[docs] def __str__(self):
return self.__repr__()
[docs]class WorkerWrapper(CoroutineWrapper):
...
[docs]class AioHttpWrapper(CoroutineWrapper):
"""Wrapper for starting the application asynchronously or serving on multiple HOST/PORT.
Example:
>>> runner = web.AppRunner(app)
... await runner.setup()
... site = web.TCPSite(runner, 'localhost', 8080)
... await site.start()
"""
__slots__ = (
'aiohttp_wrapper_args', 'runner'
)
[docs] def __init__(self,
aiohttp_wrapper_args: dict,
loop: asyncio.AbstractEventLoop,
stop_callback=None,
graceful_exit_time=False
):
"""
Initialize web.AppRunner
Attributes:
loop (asyncio.AbstractEventLoop): current Event loop
runner (web.AppRunner): aiohttp web app runner
_shutdown (bool): application stop flag
"""
self.aiohttp_wrapper_args = aiohttp_wrapper_args
self.loop = loop
self._shutdown = False
self.runner = web.AppRunner(
app=self.aiohttp_wrapper_args["app"],
access_log_class=self.aiohttp_wrapper_args["access_log_class"],
access_log_format=web.AccessLogger.LOG_FORMAT,
access_log=web.access_logger,
keepalive_timeout=self.aiohttp_wrapper_args["keepalive_timeout"],
)
coroutine = self.__sleep_stream()
coroutine.__name__ = f'Application Server ' \
f'{self.aiohttp_wrapper_args["host"]}:{self.aiohttp_wrapper_args["port"]}'
super().__init__(coroutine, loop, stop_callback, graceful_exit_time)
async def __sleep_stream(self):
"""Creates HTTP server
"""
await self.runner.setup()
site = web.TCPSite(runner=self.runner,
host=self.aiohttp_wrapper_args["host"],
port=self.aiohttp_wrapper_args["port"],
shutdown_timeout=self.aiohttp_wrapper_args["shutdown_timeout"],
ssl_context=self.aiohttp_wrapper_args["ssl_context"],
backlog=self.aiohttp_wrapper_args["backlog"],
reuse_address=self.aiohttp_wrapper_args["reuse_address"],
reuse_port=self.aiohttp_wrapper_args["reuse_port"])
await site.start()
while not self._shutdown:
await asyncio.sleep(3600)
[docs] async def _cleanup(self):
"""Cleanup web.AppRunner.
"""
await self.runner.cleanup()
[docs]class ApplicationWrapper:
"""Application Wrapper allows to create and configure app with preset configuration, and also has custom methods.
Custom methods:
- add_web_app (add custom aiohttp web application)
- add_coroutine (add task to asyncio.loop)
"""
#: str: condition to exit after the first error occurs
FIRST_EXCEPTION = asyncio.FIRST_EXCEPTION
#: str: condition to exit after the first coroutines complete
FIRST_COMPLETED = asyncio.FIRST_COMPLETED
#: str: condition to exit after all coroutines complete
ALL_COMPLETED = asyncio.ALL_COMPLETED
__slots__ = ('loop', 'apps', 'return_when', 'runner', 'signals')
[docs] def __init__(self,
loop: asyncio.AbstractEventLoop = None,
return_when: Optional[str] = FIRST_EXCEPTION,
signals: Optional[Union[list, set]] = None,
debug: Optional[bool] = None
):
"""
Application wrapper
Args:
loop (:obj:`asyncio.AbstractEventLoop`, optional): Event loop
return_when (:obj:`str`, optional): wrapper stop condition, default FIRST_EXCEPTION
signals (:obj:`list`, optional): UNIX signals to stop
debug (:obj:`bool`, optional): run loop on debug mode
Attributes:
runner (asyncio.Task): current wrapper task
loop (asyncio.AbstractEventLoop`): current Event loop
return_when (str): exit condition
apps (list): list of registered applications
Returns:
None
"""
self.runner = None
self.loop = loop
self.return_when = return_when
self.apps = []
if self.loop is None:
asyncio.set_event_loop_policy(EventLoopPolicy())
self.loop = asyncio.get_event_loop()
if debug is not None:
self.loop.set_debug(debug)
if sys.platform == 'win32':
self.signals = []
else:
self.signals = {signal.SIGTERM, signal.SIGHUP, signal.SIGINT}
if isinstance(signals, (list, set)):
self.signals = signals
[docs] def add_web_app(self, app: Union[web.Application, Awaitable[web.Application]],
*,
host: Optional[Union[str, web.HostSequence]] = '0.0.0.0',
port: Optional[int] = 80,
shutdown_timeout: Optional[float] = 60.0,
keepalive_timeout: Optional[float] = 75.0,
ssl_context: Optional[web.SSLContext] = None,
backlog: Optional[int] = 128,
access_log_class: Type[web.AbstractAccessLogger] = web.AccessLogger,
reuse_address: Optional[bool] = True,
reuse_port: Optional[bool] = True,
stop_callback: Optional[bool] = None,
graceful_exit_time: Optional[bool] = False
) -> None:
"""
Method for add custom aiohttp web application.
Args:
app (:obj:`web.Application`): aiohttp Application
host (:obj:`str`, optional): listen ip address, default '0.0.0.0'
port (:obj:`int`, optional): listen tcp port, default 80
shutdown_timeout (:obj:`float`, optional):
keepalive_timeout (:obj:`float`, optional):
ssl_context (:obj:`web.SSLContext`, optional):
backlog (:obj:`int`, optional):
access_log_class (:obj:`web.AbstractAccessLogger`, optional):
reuse_address (:obj:`bool`, optional):
reuse_port (:obj:`bool`, optional):
stop_callback (:obj:`bool`, optional):
graceful_exit_time (:obj:`bool`, optional):
"""
aiohttp_wrapper_args = {
"app": app,
"host": host or '0.0.0.0',
"port": port or 80,
"shutdown_timeout": shutdown_timeout or 60.0,
"keepalive_timeout": keepalive_timeout or 75.0,
"ssl_context": ssl_context,
"backlog": backlog or 128,
"access_log_class": access_log_class,
"reuse_address": reuse_address,
"reuse_port": reuse_port
}
self.apps.append(
AioHttpWrapper(aiohttp_wrapper_args, self.loop, stop_callback, graceful_exit_time)
)
[docs] def add_coroutine(self, coro, stop_callback=None, graceful_exit_time=False):
"""
Method for add task to asyncio.loop
Args:
coro:
stop_callback:
graceful_exit_time:
"""
self.apps.append(
CoroutineWrapper(coro, self.loop, stop_callback, graceful_exit_time)
)
[docs] def add_worker_app(self, app, stop_callback=None, graceful_exit_time=False):
stop_callback = stop_callback or getattr(app, 'stop', None)
self.apps.append(
WorkerWrapper(app(), self.loop, stop_callback, graceful_exit_time)
)
def __handle_signal(self, _signal_name, callback):
"""
Set callback as the handler for the _signal_name signal.
The callback will be invoked by loop, along with other queued callbacks
and runnable coroutines of that event loop.
Returns:
None
"""
try:
self.loop.add_signal_handler(_signal_name, callback)
signal.signal(_signal_name, callback)
except NotImplementedError:
# add_signal_handler is not implemented on Windows
pass
def __shutdown_app(self):
"""Method for shutdown all added apps.
"""
for app in self.apps:
app.shutdown()
def __shutdown_runner(self):
"""Method for shutdown web.AppRunner.
"""
LOG.info('Cancel runner')
if not self.loop.is_closed():
if self.runner and not self.runner.done():
self.runner.cancel()
self.__cancel_tasks()
def __cancel_tasks(self) -> None:
"""Method for cancel all active tasks.
"""
LOG.info('Cancel all_tasks')
to_cancel = asyncio.all_tasks(self.loop)
if not to_cancel:
return
for task in to_cancel:
if not task.done():
task.cancel()
self.loop.create_task(self.__wait(to_cancel, self.ALL_COMPLETED, canceled_pending=True))
[docs] def _graceful_exit(self, *args, **kwargs) -> None:
"""Method for graceful close aiohttp.loop.
if loop is running, wait until complete.
"""
self.__shutdown_app()
if not self.loop.is_running():
self.loop.run_until_complete(asyncio.sleep(1))
def __close_loop(self):
"""Close aiohttp.loop.
"""
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
self.loop.run_until_complete(self.loop.shutdown_default_executor())
self.loop.stop()
self.loop.close()
asyncio.set_event_loop(None)
async def __wait(self, tasks, return_when=None, canceled_pending=False):
"""
Wrapper for asyncio.wait.
Args:
tasks (list): coroutine tasks
return_when : asyncio.FIRST_EXCEPTION, asyncio.FIRST_COMPLETED, asyncio.ALL_COMPLETED
canceled_pending (bool): If True cancel pending tasks
"""
if return_when is None:
return_when = self.return_when
done, pending = await asyncio.wait(tasks, return_when=return_when)
for app in self.apps:
app.shutdown()
for task in done:
if task.cancelled():
continue
try:
task.result()
except Exception:
...
if canceled_pending:
for task in pending:
task.cancel()
[docs] def __call__(self):
"""Starts configurated ApplicationWrapper
"""
for sign in self.signals:
self.__handle_signal(sign, self._graceful_exit)
try:
LOG.info('Start serving')
self.runner = self.loop.create_task(self.__wait([app.initialize() for app in self.apps]),
name='ApplicationWrapper run_all')
self.loop.run_until_complete(self.runner)
except (KeyboardInterrupt, web.GracefulExit, SystemExit, Exception):
self.__shutdown_app()
finally:
self._graceful_exit()
self.__shutdown_runner()
self.__close_loop()
LOG.info('Stop serving')