Source code for prozorro_sale.tools.file_watcher

import asyncio
import time
from functools import partial
from os import stat, kill
from pathlib import Path
from signal import SIGCHLD
from typing import Union, TYPE_CHECKING

from aiohttp import web

from prozorro_sale.tools.executors import ProcessExecutorWrapper, ProcessExit
from prozorro_sale.tools.logger import get_custom_logger

if TYPE_CHECKING:
    # type hinting without cyclic imports
    from prozorro_sale.tools.application_worker import ApplicationWorker


__all__ = ('setup_file_watcher', 'files_watcher', 'FilesWatcher', 'FileObserver')
LOG = get_custom_logger(__name__)


[docs]class FileObserver: __slots__ = ('__executor', '__watcher', '__sync_manager', 'files', 'callback', '_stop_watching')
[docs] def __init__(self, path, callback, *args, **kwargs): LOG.info(f"Added file tracking {path} with subsequent launch {getattr(callback, '__name__')}") self.__executor = ProcessExecutorWrapper(shutdown_wait=False, shutdown_cancel_futures=True) self.__watcher = None self._stop_watching = False self.files = self.path_to_files(path) self.callback = partial(callback, *args, **kwargs)
[docs] async def __aenter__(self): """ """ await self.start() return self
[docs] def __enter__(self): """ """ self._start() return self
[docs] async def __aexit__(self, exc_type, exc_val, exc_tb): """ """ await self.stop()
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """ """ self._stop()
[docs] def __del__(self): try: self._stop() except BaseException: ...
[docs] async def _watcher(self): while not self._stop_watching: try: is_modify = await self.__executor(self.check_modify, self.files) if is_modify: LOG.info(f"File is modify. Run {self.callback.func.__name__}") await self.callback() except (Exception, web.GracefulExit) as e: LOG.error(f"Stop file watching - {e}") self._stop_watching = True raise web.GracefulExit(e) except ProcessExit: break
[docs] @staticmethod def check_modify(files): try: files_modification_time = dict((file, stat(file).st_mtime) for file in files) if not files_modification_time: msg = f'Path {files} not found' LOG.error(msg) raise FileNotFoundError(msg) while True: for file_path, mtime in files_modification_time.items(): if mtime != stat(file_path).st_mtime: return True time.sleep(1) except ProcessExit: ...
[docs] @staticmethod def path_to_files(path): """ Convert recursive file patch to list files Args: path: file patch Returns: list files """ path = str(path).replace('~', f'{Path.home()}/') if '*' in path and not path.startswith('/'): files = sorted(Path().rglob(path)) else: files = [Path(path)] if not files: raise FileNotFoundError(f'Path {path} not found') for file_path in files: if not Path(file_path).exists(): raise FileNotFoundError(f'Path {file_path} not found') return files
[docs] async def start(self): self._start()
[docs] def _start(self): file_obj = next(iter(self.files)) name = self.callback.func.__name__ LOG.info(f"Start file tracking {file_obj} with subsequent launch {name}") self.__watcher = asyncio.create_task( self._watcher(), name=f"File tracking {file_obj} with subsequent launch {name}" )
[docs] async def stop(self): self._stop()
[docs] def _stop(self): if self._stop_watching: return self._stop_watching = True LOG.info(f"Stoping file tracking {next(iter(self.files))} with subsequent launch {self.callback.func.__name__}") if self.__executor: if self.__executor._ex._processes: for pid in self.__executor._ex._processes.keys(): try: LOG.info(f"Send {SIGCHLD} for file tracking process with pid {pid}") kill(pid, SIGCHLD) except OSError: ... time.sleep(0) self.__executor.shutdown() if self.__watcher: if not self.__watcher.done(): self.__watcher.cancel() else: try: self.__watcher.result() except BaseException as e: LOG.info(e) LOG.info(f"Stoped file tracking {next(iter(self.files))} with subsequent launch {self.callback.func.__name__}")
[docs] async def join(self): await self.__watcher
[docs]class FilesWatcher: __slots__ = ('__watchers')
[docs] def __init__(self): self.__watchers = []
[docs] def __del__(self): try: self._stop() except BaseException: ...
[docs] async def start(self, app: Union[web.Application, 'ApplicationWorker']): app.files_watcher = self._start()
[docs] def _start(self): return self
[docs] async def stop(self, app: web.Application): for watcher in self.__watchers: if watcher: await watcher.stop()
[docs] def _stop(self): for watcher in self.__watchers: if watcher: watcher._stop()
[docs] async def watch(self, path, callback, *args, **kwargs): watcher = FileObserver(path, callback, *args, **kwargs) await watcher.start() self.__watchers.append(watcher)
#: FileWatcher: singleton instance for Requests Wrapper files_watcher = FilesWatcher()
[docs]def setup_file_watcher(app: Union[web.Application, 'ApplicationWorker']) -> None: app.on_startup.append(files_watcher.start) app.on_shutdown.append(files_watcher.stop)