Source code for prozorro_sale.tools.executors

import asyncio
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
from signal import signal, SIGINT, SIGTERM, SIGCHLD, SIGKILL

from aiohttp import web

__all__ = ['setup_threads_executor', 'setup_process_executor']


[docs]class ProcessExit(SystemExit): code = 1
[docs]class ThreatExecutorWrapper: """Initializes a new ThreadPoolExecutor instance. """ __slots__ = ['_ex', '_loop', '_shutdown_wait', '_shutdown_cancel_futures'] executor = ThreadPoolExecutor
[docs] def __init__(self, nthreads=1, shutdown_wait=True, shutdown_cancel_futures=True): self._ex = self.executor( max_workers=nthreads, initializer=self._executor_init, initargs=(os.getpid(),) ) self._loop = asyncio.get_running_loop() self._shutdown_wait = shutdown_wait self._shutdown_cancel_futures = shutdown_cancel_futures
[docs] @staticmethod def _executor_init(ppid): ...
[docs] def __call__(self, func, *args, **kw): return self._loop.run_in_executor(self._ex, partial(func, *args, **kw))
[docs] def shutdown(self): """Tear down an executor and clean-up the resources associated with the it.""" self._ex.shutdown(wait=self._shutdown_wait, cancel_futures=self._shutdown_cancel_futures)
[docs]class ProcessExecutorWrapper(ThreatExecutorWrapper): """Initializes a new ProcessPoolExecutor instance. """ __slots__ = [] executor = ProcessPoolExecutor
[docs] @staticmethod def _executor_init(ppid): pid = os.getpid() def handler_global_exit(sig, frame): raise ProcessExit() def handler(sig, frame): raise web.GracefulExit() signal(SIGINT, handler) signal(SIGTERM, handler) signal(SIGCHLD, handler_global_exit) # TODO: Convert to signal SIGALRM without additional thread def parent_watcher(): """Kills child when a parent dies """ while True: try: os.kill(ppid, 0) except OSError: os.kill(pid, SIGKILL) time.sleep(1) thread = threading.Thread(target=parent_watcher, daemon=True) thread.start()
[docs]async def init_threads_executor(app: web.Application): """Initialize an executor in app.""" app.threat_executor = ThreatExecutorWrapper()
[docs]async def close_threads_executor(app: web.Application): """Closing an executor in app.""" app.threat_executor.shutdown()
[docs]async def init_process_executor(app: web.Application): """Initialize an executor in app.""" app.process_executor = ProcessExecutorWrapper()
[docs]async def close_process_executor(app: web.Application): """Closing an executor in app.""" app.process_executor.shutdown()
[docs]def setup_threads_executor(app: web.Application) -> None: """ Append ThreatExecutor wrapper to app. Args: app (object): aiohttp.web.Application instance. """ app.on_startup.append(init_threads_executor) app.on_shutdown.append(close_threads_executor)
[docs]def setup_process_executor(app: web.Application) -> None: """ Append ProcessExecutor wrapper to app. Args: app (object): aiohttp.web.Application instance. """ app.on_startup.append(init_process_executor) app.on_shutdown.append(close_process_executor)