-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
166 lines (130 loc) · 5.14 KB
/
utils.py
File metadata and controls
166 lines (130 loc) · 5.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import logging
import sys
import os
import re
from ast import literal_eval
import importlib.util
from pathlib import Path
from typing import Union, List, Dict, Any, Union
def import_if_installed(library_name):
lib = None
if importlib.util.find_spec(library_name) is not None:
lib = importlib.import_module(library_name)
logging.debug(f"{library_name} is installed and has been imported.")
else:
logging.debug(f"{library_name} is not installed.")
return lib
def camel_case_split(identifier):
matches = re.finditer(".+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)", identifier)
return [m.group(0) for m in matches]
def get_env_variable(key: Union[str, List[str]], default_value, check_for_prefix: bool = False) -> Any:
prefix = ""
if check_for_prefix and ("PREFIX" in os.environ):
prefix = f"{os.environ['PREFIX']}_"
if isinstance(key, str):
key = [key]
for ky in key:
# name of the environment variable to look for
nm = f"{prefix}{ky}"
if nm in os.environ:
value = cast(os.environ[nm])
print(f"Found environment variable {nm} found. Using value: {value}")
return value
print(f"No environment variable {key} found. Using default value: {default_value}")
return default_value
def get_environment_variables(prefix: str = None, with_prefix: bool = True) -> Dict[str, Any]:
re_prefix = re.compile(prefix if prefix is not None else "")
config = dict()
for ky in os.environ:
m = re_prefix.match(ky)
if m:
nm = ky if with_prefix else ky[m.end():]
config[nm] = cast(os.environ[ky])
return config
def default_from_env(key: Union[str, List[str]], default: Any) -> Any:
return get_env_variable(key, default, check_for_prefix=True)
re_number = re.compile("^[0-9.,]+$")
re_integer = re.compile(r"^\d+$")
re_float = re.compile(r"^((\d+\.(\d+)?)|(\.\d+))$")
re_float_de = re.compile(r"^((\d+,(\d+)?)|(,\d+))$")
re_boolean = re.compile(r"^(true|false)$", re.IGNORECASE | re.ASCII)
re_list_or_tuple_or_dict = re.compile(r"^\s*(\[.*\]|\(.*\)|\{.*\})\s*$", re.ASCII)
re_comma = re.compile(r"^(\".*\")|(\'.*\')$", re.ASCII)
def cast(var: str) -> Union[None, int, float, str, bool]:
"""casting strings to primitive datatypes"""
if re_number.match(var):
if re_integer.match(var): # integer
var = int(var)
elif re_float.match(var): # float
var = float(var)
elif re_float_de.match(var): # float
var = float(var.replace(",", "."))
elif var.lower() == "none":
var = None
elif re_boolean.match(var):
var = True if var[0].lower() == "t" else False
elif re_list_or_tuple_or_dict.match(var):
var = literal_eval(var)
elif re_comma.match(var):
# strip enclosing high comma
var = var.strip('"').strip('"')
return var
def set_env_variable(key: str, val) -> bool:
os.environ[key] = str(val)
return True
def cast_logging_level(var: str, default: int = logging.INFO) -> int:
"""Only casts logging levels"""
# cast string if possible
if isinstance(var, str):
var = cast(var)
options = {
"debug": logging.DEBUG, # 10
"info": logging.INFO, # 20
"warning": logging.WARNING, # 30
"warn": logging.WARN, # 30
"error": logging.ERROR, # 40
"critical": logging.CRITICAL, # 50
"fatal": logging.FATAL, # 50
"notset": logging.NOTSET # 0
}
if isinstance(var, int):
if var not in options.values():
return default
elif isinstance(var, str):
for ky, val in options.items():
if var.lower() == ky:
return val
else:
return default
return var
def get_logging_level(key: str = "LOGGING_LEVEL", default: int = logging.INFO) -> int:
return cast_logging_level(get_env_variable(key, default))
def setup_logging(
name: str,
level: Union[str, int] = logging.INFO,
use_env_log_level: bool = True,
period_sec: int = 1
) -> logging.Logger:
log_file = get_env_variable("LOGFILE", None)
log_level = get_logging_level("LOGGING_LEVEL", level) if use_env_log_level else level
# Setup logging
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)] + # Ensures logs are forwarded to Docker
[logging.FileHandler(Path(log_file).with_suffix(".log"))] if log_file is not None else [],
)
# create file wide logger
logger_ = logging.getLogger(name)
# Add our filter
if import_if_installed("redis"):
# package log-rate-limit requires redis to be available
log_rate_limit = import_if_installed("log_rate_limit")
if log_rate_limit:
logger_.addFilter(log_rate_limit.StreamRateLimitFilter(period_sec=period_sec))
# first log message
logger_.info(f"Logging configured: level={log_level}, file={log_file}")
logger_.debug(f"Environment variables: {os.environ}")
return logger_
# Setup logging
logger = setup_logging(__name__)