Source code for prozorro_sale.tools.environment

"""
Package to provide a wrapper for parsing, cleaning, checking environment variables.
"""
import os
import pathlib
import sys
from distutils.util import strtobool
from typing import Union, Any

from dotenv import load_dotenv, find_dotenv
from yarl import URL

__ALL__ = ['Environment', 'booleans', 'url', 'path_discover']


[docs]class Environment: """Parser, cleaner, validator for Environment Variables. """ __slots__ = ('environ', 'spec', 'prefix', 'strict', 'missing', 'malformed', 'parsed')
[docs] def __init__(self, prefix: str = '', spec: Union[dict, None] = None, environ: Union[dict, None] = None, default: Union[dict, None] = None, strict: bool = False, dotenv: Union[str, None] = None, **kw: Any): """Parser, cleaner, validator for Environment Variables. Args: prefix (str): filtering environ variables on start prefix spec (Union[dict, None]): Validation rules environ (Union[dict, None]): predefined environ variables. If set None then os.environ is used default (Union[dict, None]): default environ variables. strict (bool): strict validation. Raise Error on invalid variables dotenv (Union[str, bool, None]): absolute or relative path to .env file. **kw (kwargs): for set Validation rules Example: >>> pretend_os_environ = { >>> 'A': '42', >>> 'B': 'b', >>> 'C': 'yes', >>> 'yes': 'yes', >>> 'no': '', >>> 'D': 'd' >>> } env = Environment(A=int, M=str, D=int, yes=booleans, no=booleans, environ=pretend_os_environ, strict=False) """ self.dotenv(dotenv) if not isinstance(environ, dict): environ = os.environ self.environ = environ.copy() if isinstance(default, dict): for env_name, env_val in default.items(): self.environ.setdefault(env_name, env_val) if not isinstance(spec, dict): spec = {} spec.update(kw) self.spec = spec self.prefix = prefix self.strict = strict self.missing = [] self.malformed = [] self.parsed = {} self.missing, self.malformed, self.parsed = self._parse() self.check_strict()
[docs] @staticmethod def dotenv(dotenv: Union[str, bool, None] = None): """ Load environment variables from file. Args: dotenv (Union[str, bool, None]): File path, py.env by default. To disable loading, you must pass False """ dotenv_file = 'py.env' if isinstance(dotenv, str): dotenv_file = dotenv if dotenv != bool(0): load_dotenv(find_dotenv(dotenv_file), override=False)
[docs] def clenup(self, value): """ Clears the values of environment variables from spaces and line breaks """ value = str(value).strip() value = value.rstrip() return value
[docs] def check_strict(self, spec=None, strict=None): """Check current environment variables with expected Args: spec (dict): expected environment variables strict: bool Raises: ValueError: expected environment variables not defined """ missing = self.missing if strict is None: strict = self.strict if spec: missing = self.check_missing(spec=spec) if strict: if missing or self.malformed: msg = ['Env vars:'] if missing: msg.append(f'{missing} are not defined') if self.malformed: for var_name, var_error in self.malformed: msg.append(f'{var_name} are not serialized - {var_error}') raise ValueError(' '.join(msg))
[docs] def check_missing(self, spec=None, environ=None): """Check for missing environment variables Returns: list: difference between expected environment variables and current """ if not spec: spec = self.spec if not environ: environ = self.environ return sorted(list(set(spec) - set(environ)))
[docs] def parse(self, prefix=None, spec=None, environ=None): """Parse environment variables""" return self._parse(prefix=prefix, spec=spec, environ=environ)
[docs] def _parse(self, prefix=None, spec=None, environ=None): """Parse and validate environment variables""" if not prefix: prefix = self.prefix if not spec: spec = self.spec if not environ: environ = self.environ missing = [] malformed = [] parsed = {} if spec: missing = self.check_missing(spec=spec, environ=environ) for name, value in sorted(environ.items()): value = self.clenup(value) if not name.startswith(prefix): continue unprefixed = name[len(prefix):] if spec and unprefixed not in spec: continue type_ = spec.get(unprefixed, str) try: value = type_(value) except Exception: exc_type, exc_instance = sys.exc_info()[:2] msg = f"{exc_type.__name__}: {exc_instance}" malformed.append((name, msg)) continue parsed[unprefixed] = value return missing, malformed, parsed
[docs] def __getattr__(self, name): try: return self.parsed[name] except KeyError: cls = self.__class__.__name__ raise AttributeError(f"'{cls}' object has no attribute '{name}'")
[docs] def __getitem__(self, key): return getattr(self, key)
[docs] def __setitem__(self, key, value): if key in self.__class__.__slots__: super().__setattr__(key, value) else: if self.strict and key in self.parsed: raise AttributeError(f"In strict mode, it is forbidden to modify parsed variables '{key}'") self.parsed[key] = value
[docs] def __setattr__(self, name, value): if name in self.__class__.__slots__: super().__setattr__(name, value) else: if self.strict and name in self.parsed: raise AttributeError(f"In strict mode, it is forbidden to modify parsed variables '{name}'") self.parsed[name] = value
[docs]def booleans(value): """Check value is boolean Args: value (Any) Raises: ValueError: value isn't boolean """ try: return bool(strtobool(value)) except ValueError: return False
[docs]def url(value): """Check value is url Raises: ValueError: value isn't valid url format """ link = URL(str(value)) if link.scheme not in ('http', 'https'): link = url(URL(f'http://{link}')) if not link.raw_host: raise ValueError(f'Not valid url {value}') return link
[docs]def path_discover(value, discover=True): """Validate string as folder path with provides support for Unix shell-style wildcards and auto discorever Raises: IOError: directory or file path not found """ check_method = ['is_dir', 'is_file'] value = str(value) value = value.replace('~', f'{pathlib.Path.home()}/') value_path = pathlib.Path(value) if (value_path.is_absolute() or not discover) and not value_path.exists(): raise FileNotFoundError(f'Path {value_path} not found') if value_path.suffixes: check_method = ['is_file', ] def _walk_to_root(path): if not path.exists(): raise FileNotFoundError('Starting path not found') if path.is_file(): path = path.parent last_dir = None current_dir = path.resolve() while last_dir != current_dir: yield current_dir parent_dir = current_dir.parent last_dir, current_dir = current_dir, parent_dir for dirname in _walk_to_root(pathlib.Path().absolute()): dirs = [dirname.joinpath(value_path), ] if '*' in value: dirs = dirname.glob(value) for check_path in dirs: if check_path.exists() and any([getattr(check_path, method)() for method in check_method]): return check_path raise FileNotFoundError(f'Path {value_path} not found')