import asyncio
import time
from functools import partial
from os import stat, kill
from pathlib import Path
from signal import SIGCHLD
from typing import Union, TYPE_CHECKING
from aiohttp import web
from prozorro_sale.tools.executors import ProcessExecutorWrapper, ProcessExit
from prozorro_sale.tools.logger import get_custom_logger
if TYPE_CHECKING:
# type hinting without cyclic imports
from prozorro_sale.tools.application_worker import ApplicationWorker
__all__ = ('setup_file_watcher', 'files_watcher', 'FilesWatcher', 'FileObserver')
LOG = get_custom_logger(__name__)
[docs]class FileObserver:
__slots__ = ('__executor', '__watcher', '__sync_manager', 'files', 'callback', '_stop_watching')
[docs] def __init__(self, path, callback, *args, **kwargs):
LOG.info(f"Added file tracking {path} with subsequent launch {getattr(callback, '__name__')}")
self.__executor = ProcessExecutorWrapper(shutdown_wait=False, shutdown_cancel_futures=True)
self.__watcher = None
self._stop_watching = False
self.files = self.path_to_files(path)
self.callback = partial(callback, *args, **kwargs)
[docs] async def __aenter__(self):
"""
"""
await self.start()
return self
[docs] def __enter__(self):
"""
"""
self._start()
return self
[docs] async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
"""
await self.stop()
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
"""
"""
self._stop()
[docs] def __del__(self):
try:
self._stop()
except BaseException:
...
[docs] async def _watcher(self):
while not self._stop_watching:
try:
is_modify = await self.__executor(self.check_modify, self.files)
if is_modify:
LOG.info(f"File is modify. Run {self.callback.func.__name__}")
await self.callback()
except (Exception, web.GracefulExit) as e:
LOG.error(f"Stop file watching - {e}")
self._stop_watching = True
raise web.GracefulExit(e)
except ProcessExit:
break
[docs] @staticmethod
def check_modify(files):
try:
files_modification_time = dict((file, stat(file).st_mtime) for file in files)
if not files_modification_time:
msg = f'Path {files} not found'
LOG.error(msg)
raise FileNotFoundError(msg)
while True:
for file_path, mtime in files_modification_time.items():
if mtime != stat(file_path).st_mtime:
return True
time.sleep(1)
except ProcessExit:
...
[docs] @staticmethod
def path_to_files(path):
"""
Convert recursive file patch to list files
Args:
path: file patch
Returns:
list files
"""
path = str(path).replace('~', f'{Path.home()}/')
if '*' in path and not path.startswith('/'):
files = sorted(Path().rglob(path))
else:
files = [Path(path)]
if not files:
raise FileNotFoundError(f'Path {path} not found')
for file_path in files:
if not Path(file_path).exists():
raise FileNotFoundError(f'Path {file_path} not found')
return files
[docs] async def start(self):
self._start()
[docs] def _start(self):
file_obj = next(iter(self.files))
name = self.callback.func.__name__
LOG.info(f"Start file tracking {file_obj} with subsequent launch {name}")
self.__watcher = asyncio.create_task(
self._watcher(),
name=f"File tracking {file_obj} with subsequent launch {name}"
)
[docs] async def stop(self):
self._stop()
[docs] def _stop(self):
if self._stop_watching:
return
self._stop_watching = True
LOG.info(f"Stoping file tracking {next(iter(self.files))} with subsequent launch {self.callback.func.__name__}")
if self.__executor:
if self.__executor._ex._processes:
for pid in self.__executor._ex._processes.keys():
try:
LOG.info(f"Send {SIGCHLD} for file tracking process with pid {pid}")
kill(pid, SIGCHLD)
except OSError:
...
time.sleep(0)
self.__executor.shutdown()
if self.__watcher:
if not self.__watcher.done():
self.__watcher.cancel()
else:
try:
self.__watcher.result()
except BaseException as e:
LOG.info(e)
LOG.info(f"Stoped file tracking {next(iter(self.files))} with subsequent launch {self.callback.func.__name__}")
[docs] async def join(self):
await self.__watcher
[docs]class FilesWatcher:
__slots__ = ('__watchers')
[docs] def __init__(self):
self.__watchers = []
[docs] def __del__(self):
try:
self._stop()
except BaseException:
...
[docs] async def start(self, app: Union[web.Application, 'ApplicationWorker']):
app.files_watcher = self._start()
[docs] def _start(self):
return self
[docs] async def stop(self, app: web.Application):
for watcher in self.__watchers:
if watcher:
await watcher.stop()
[docs] def _stop(self):
for watcher in self.__watchers:
if watcher:
watcher._stop()
[docs] async def watch(self, path, callback, *args, **kwargs):
watcher = FileObserver(path, callback, *args, **kwargs)
await watcher.start()
self.__watchers.append(watcher)
#: FileWatcher: singleton instance for Requests Wrapper
files_watcher = FilesWatcher()
[docs]def setup_file_watcher(app: Union[web.Application, 'ApplicationWorker']) -> None:
app.on_startup.append(files_watcher.start)
app.on_shutdown.append(files_watcher.stop)