import asyncio
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
from signal import signal, SIGINT, SIGTERM, SIGCHLD, SIGKILL
from aiohttp import web
__all__ = ['setup_threads_executor', 'setup_process_executor']
[docs]class ProcessExit(SystemExit):
code = 1
[docs]class ThreatExecutorWrapper:
"""Initializes a new ThreadPoolExecutor instance.
"""
__slots__ = ['_ex', '_loop', '_shutdown_wait', '_shutdown_cancel_futures']
executor = ThreadPoolExecutor
[docs] def __init__(self, nthreads=1, shutdown_wait=True, shutdown_cancel_futures=True):
self._ex = self.executor(
max_workers=nthreads,
initializer=self._executor_init,
initargs=(os.getpid(),)
)
self._loop = asyncio.get_running_loop()
self._shutdown_wait = shutdown_wait
self._shutdown_cancel_futures = shutdown_cancel_futures
[docs] @staticmethod
def _executor_init(ppid):
...
[docs] def __call__(self, func, *args, **kw):
return self._loop.run_in_executor(self._ex, partial(func, *args, **kw))
[docs] def shutdown(self):
"""Tear down an executor and clean-up the resources associated with the it."""
self._ex.shutdown(wait=self._shutdown_wait, cancel_futures=self._shutdown_cancel_futures)
[docs]class ProcessExecutorWrapper(ThreatExecutorWrapper):
"""Initializes a new ProcessPoolExecutor instance.
"""
__slots__ = []
executor = ProcessPoolExecutor
[docs] @staticmethod
def _executor_init(ppid):
pid = os.getpid()
def handler_global_exit(sig, frame):
raise ProcessExit()
def handler(sig, frame):
raise web.GracefulExit()
signal(SIGINT, handler)
signal(SIGTERM, handler)
signal(SIGCHLD, handler_global_exit)
# TODO: Convert to signal SIGALRM without additional thread
def parent_watcher():
"""Kills child when a parent dies
"""
while True:
try:
os.kill(ppid, 0)
except OSError:
os.kill(pid, SIGKILL)
time.sleep(1)
thread = threading.Thread(target=parent_watcher, daemon=True)
thread.start()
[docs]async def init_threads_executor(app: web.Application):
"""Initialize an executor in app."""
app.threat_executor = ThreatExecutorWrapper()
[docs]async def close_threads_executor(app: web.Application):
"""Closing an executor in app."""
app.threat_executor.shutdown()
[docs]async def init_process_executor(app: web.Application):
"""Initialize an executor in app."""
app.process_executor = ProcessExecutorWrapper()
[docs]async def close_process_executor(app: web.Application):
"""Closing an executor in app."""
app.process_executor.shutdown()
[docs]def setup_threads_executor(app: web.Application) -> None:
"""
Append ThreatExecutor wrapper to app.
Args:
app (object): aiohttp.web.Application instance.
"""
app.on_startup.append(init_threads_executor)
app.on_shutdown.append(close_threads_executor)
[docs]def setup_process_executor(app: web.Application) -> None:
"""
Append ProcessExecutor wrapper to app.
Args:
app (object): aiohttp.web.Application instance.
"""
app.on_startup.append(init_process_executor)
app.on_shutdown.append(close_process_executor)