import ast
import datetime
import json
import random
import time
import uuid
# noinspection PyUnresolvedReferences
from uuid import UUID # for UUID as object parsing
from collections.abc import Iterable
from types import ModuleType
from typing import Union
from jinja2 import Template, UndefinedError
from catcher.utils import module_utils
from catcher.utils.logger import debug
from catcher.core.filters_factory import FiltersFactory
[docs]def merge_two_dicts(x, y):
if not x:
return y
if not y:
return x
return {**x, **y}
[docs]def report_override(variables: dict, override: dict):
existing = set(variables)
replace = set(override)
return list(existing.intersection(replace))
[docs]def try_get_objects(source: str or dict or list):
got = try_get_object(source) # "'[1,2,3]'" -> '[1,2,3]' -> [1,2,3]
got = try_get_object(got) # '[1,2,3]' -> [1,2,3]
if isinstance(got, dict):
return dict([(k, try_get_objects(v)) for k, v in got.items()])
if isinstance(got, list):
return [try_get_objects(v) for v in got]
return got
[docs]def try_get_object(source: str or dict or list):
if isinstance(source, str):
try: # try python term '{key: "value"}'
evaled = eval_datetime(source)
if isinstance(evaled, ModuleType) or callable(evaled): # for standalone 'string' var or 'id' bif
return source
source = evaled
except Exception:
try: # try json object '{"key" : "value"}'
source = json.loads(source)
except ValueError:
return source
return source
[docs]def fill_template_recursive(source: Union[dict, list, str], variables: dict, glob=None, globs_added=None) \
-> Union[dict, list, str]:
if isinstance(source, dict):
return dict([(fill_template_recursive(k, variables, glob, globs_added),
fill_template_recursive(v, variables, glob, globs_added)) for k, v in source.items()])
if isinstance(source, list):
return [fill_template_recursive(v, variables, glob, globs_added) for v in source]
return fill_template(source, variables, glob, globs_added)
[docs]def fill_template(source: str, variables: dict, isjson=False, glob=None, globs_added=None) -> str:
if not globs_added:
globs_added = set()
if isinstance(source, str):
source = render(source, inject_builtins(variables))
if isjson: # do not parse json string back to objects
return source
try:
evaled = format_datetime(eval_datetime(source, glob))
if not isinstance(evaled, ModuleType) and not callable(evaled): # for standalone 'string' var or 'id' bif
source = evaled
except NameError as e: # try to import missing package and rerun this code
if 'is not defined' in str(e):
name = str(e).split("'")[1]
if name not in globs_added:
# f.e. tzinfo=psycopg2.tz.FixedOffsetTimezone for datetime
glob = module_utils.add_package_to_globals(name, glob, warn_missing_package=False)
globs_added.add(name)
filled = fill_template(source, variables, isjson, glob=glob, globs_added=globs_added)
if not isinstance(filled, ModuleType) and not callable(filled):
return filled # for standalone 'string' var or 'id' bif
except Exception:
pass
return source
[docs]def fill_template_str(source: any, variables: dict) -> str:
rendered = render(str(source), inject_builtins(variables))
if rendered != source:
return fill_template_str(rendered, variables)
return rendered
[docs]def eval_datetime(astr, glob=None):
if glob is None:
glob = globals()
try:
tree = ast.parse(astr)
except SyntaxError:
raise ValueError(astr)
for node in ast.walk(tree):
if isinstance(node, (ast.Module, ast.Expr, ast.Dict, ast.Str,
ast.Attribute, ast.Num, ast.Name, ast.Load, ast.Tuple)): continue
if (isinstance(node, ast.Call)
and isinstance(node.func, ast.Attribute)
and node.func.attr == 'datetime'): continue
pass
return eval(astr, glob)
[docs]def inject_builtins(variables: dict) -> dict:
variables_copy = dict(variables)
variables_copy['RANDOM_STR'] = str(uuid.uuid4())
variables_copy['RANDOM_INT'] = random.randint(-2147483648, 2147483648)
ts = round(time.time(), 6) # from timestamp uses rounding, so we should also use it here, to make them compatible
variables_copy['NOW_TS'] = ts
variables_copy['NOW_DT'] = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%dT%H:%M:%S0+0000')
return variables_copy
[docs]def render(source: str, variables: dict) -> str:
template = Template(source)
holder = FiltersFactory()
for filter_mod, value in holder.filters.items():
template.environment.filters[filter_mod] = value
for fun_mod, value in holder.functions.items():
template.globals[fun_mod] = value
try:
return template.render(variables)
except UndefinedError as e:
debug(e.message)
return source