"""
Package to provide a wrapper for parsing, cleaning, checking environment variables.
"""
import os
import pathlib
import sys
from distutils.util import strtobool
from typing import Union, Any
from dotenv import load_dotenv, find_dotenv
from yarl import URL
__ALL__ = ['Environment', 'booleans', 'url', 'path_discover']
[docs]class Environment:
"""Parser, cleaner, validator for Environment Variables.
"""
__slots__ = ('environ', 'spec', 'prefix', 'strict', 'missing', 'malformed', 'parsed')
[docs] def __init__(self, prefix: str = '',
spec: Union[dict, None] = None,
environ: Union[dict, None] = None,
default: Union[dict, None] = None,
strict: bool = False,
dotenv: Union[str, None] = None,
**kw: Any):
"""Parser, cleaner, validator for Environment Variables.
Args:
prefix (str): filtering environ variables on start prefix
spec (Union[dict, None]): Validation rules
environ (Union[dict, None]): predefined environ variables. If set None then os.environ is used
default (Union[dict, None]): default environ variables.
strict (bool): strict validation. Raise Error on invalid variables
dotenv (Union[str, bool, None]): absolute or relative path to .env file.
**kw (kwargs): for set Validation rules
Example:
>>> pretend_os_environ = {
>>> 'A': '42',
>>> 'B': 'b',
>>> 'C': 'yes',
>>> 'yes': 'yes',
>>> 'no': '',
>>> 'D': 'd'
>>> }
env = Environment(A=int, M=str, D=int, yes=booleans, no=booleans, environ=pretend_os_environ, strict=False)
"""
self.dotenv(dotenv)
if not isinstance(environ, dict):
environ = os.environ
self.environ = environ.copy()
if isinstance(default, dict):
for env_name, env_val in default.items():
self.environ.setdefault(env_name, env_val)
if not isinstance(spec, dict):
spec = {}
spec.update(kw)
self.spec = spec
self.prefix = prefix
self.strict = strict
self.missing = []
self.malformed = []
self.parsed = {}
self.missing, self.malformed, self.parsed = self._parse()
self.check_strict()
[docs] @staticmethod
def dotenv(dotenv: Union[str, bool, None] = None):
"""
Load environment variables from file.
Args:
dotenv (Union[str, bool, None]): File path, py.env by default. To disable loading, you must pass False
"""
dotenv_file = 'py.env'
if isinstance(dotenv, str):
dotenv_file = dotenv
if dotenv != bool(0):
load_dotenv(find_dotenv(dotenv_file), override=False)
[docs] def clenup(self, value):
"""
Clears the values of environment variables from spaces and line breaks
"""
value = str(value).strip()
value = value.rstrip()
return value
[docs] def check_strict(self, spec=None, strict=None):
"""Check current environment variables with expected
Args:
spec (dict): expected environment variables
strict: bool
Raises:
ValueError: expected environment variables not defined
"""
missing = self.missing
if strict is None:
strict = self.strict
if spec:
missing = self.check_missing(spec=spec)
if strict:
if missing or self.malformed:
msg = ['Env vars:']
if missing:
msg.append(f'{missing} are not defined')
if self.malformed:
for var_name, var_error in self.malformed:
msg.append(f'{var_name} are not serialized - {var_error}')
raise ValueError(' '.join(msg))
[docs] def check_missing(self, spec=None, environ=None):
"""Check for missing environment variables
Returns:
list: difference between expected environment variables and current
"""
if not spec:
spec = self.spec
if not environ:
environ = self.environ
return sorted(list(set(spec) - set(environ)))
[docs] def parse(self, prefix=None, spec=None, environ=None):
"""Parse environment variables"""
return self._parse(prefix=prefix, spec=spec, environ=environ)
[docs] def _parse(self, prefix=None, spec=None, environ=None):
"""Parse and validate environment variables"""
if not prefix:
prefix = self.prefix
if not spec:
spec = self.spec
if not environ:
environ = self.environ
missing = []
malformed = []
parsed = {}
if spec:
missing = self.check_missing(spec=spec, environ=environ)
for name, value in sorted(environ.items()):
value = self.clenup(value)
if not name.startswith(prefix):
continue
unprefixed = name[len(prefix):]
if spec and unprefixed not in spec:
continue
type_ = spec.get(unprefixed, str)
try:
value = type_(value)
except Exception:
exc_type, exc_instance = sys.exc_info()[:2]
msg = f"{exc_type.__name__}: {exc_instance}"
malformed.append((name, msg))
continue
parsed[unprefixed] = value
return missing, malformed, parsed
[docs] def __getattr__(self, name):
try:
return self.parsed[name]
except KeyError:
cls = self.__class__.__name__
raise AttributeError(f"'{cls}' object has no attribute '{name}'")
[docs] def __getitem__(self, key):
return getattr(self, key)
[docs] def __setitem__(self, key, value):
if key in self.__class__.__slots__:
super().__setattr__(key, value)
else:
if self.strict and key in self.parsed:
raise AttributeError(f"In strict mode, it is forbidden to modify parsed variables '{key}'")
self.parsed[key] = value
[docs] def __setattr__(self, name, value):
if name in self.__class__.__slots__:
super().__setattr__(name, value)
else:
if self.strict and name in self.parsed:
raise AttributeError(f"In strict mode, it is forbidden to modify parsed variables '{name}'")
self.parsed[name] = value
[docs]def booleans(value):
"""Check value is boolean
Args:
value (Any)
Raises:
ValueError: value isn't boolean
"""
try:
return bool(strtobool(value))
except ValueError:
return False
[docs]def url(value):
"""Check value is url
Raises:
ValueError: value isn't valid url format
"""
link = URL(str(value))
if link.scheme not in ('http', 'https'):
link = url(URL(f'http://{link}'))
if not link.raw_host:
raise ValueError(f'Not valid url {value}')
return link
[docs]def path_discover(value, discover=True):
"""Validate string as folder path with provides support for Unix shell-style wildcards and auto discorever
Raises:
IOError: directory or file path not found
"""
check_method = ['is_dir', 'is_file']
value = str(value)
value = value.replace('~', f'{pathlib.Path.home()}/')
value_path = pathlib.Path(value)
if (value_path.is_absolute() or not discover) and not value_path.exists():
raise FileNotFoundError(f'Path {value_path} not found')
if value_path.suffixes:
check_method = ['is_file', ]
def _walk_to_root(path):
if not path.exists():
raise FileNotFoundError('Starting path not found')
if path.is_file():
path = path.parent
last_dir = None
current_dir = path.resolve()
while last_dir != current_dir:
yield current_dir
parent_dir = current_dir.parent
last_dir, current_dir = current_dir, parent_dir
for dirname in _walk_to_root(pathlib.Path().absolute()):
dirs = [dirname.joinpath(value_path), ]
if '*' in value:
dirs = dirname.glob(value)
for check_path in dirs:
if check_path.exists() and any([getattr(check_path, method)() for method in check_method]):
return check_path
raise FileNotFoundError(f'Path {value_path} not found')