Source code for catcher.utils.misc

import ast
import datetime
import json
import random
import time
import uuid
# noinspection PyUnresolvedReferences
from uuid import UUID  # for UUID as object parsing
from collections.abc import Iterable
from types import ModuleType
from typing import Union

from jinja2 import Template, UndefinedError

from catcher.utils import module_utils
from catcher.utils.logger import debug
from catcher.core.filters_factory import FiltersFactory


[docs]def merge_two_dicts(x, y): if not x: return y if not y: return x return {**x, **y}
[docs]def report_override(variables: dict, override: dict): existing = set(variables) replace = set(override) return list(existing.intersection(replace))
[docs]def try_get_objects(source: str or dict or list): got = try_get_object(source) # "'[1,2,3]'" -> '[1,2,3]' -> [1,2,3] got = try_get_object(got) # '[1,2,3]' -> [1,2,3] if isinstance(got, dict): return dict([(k, try_get_objects(v)) for k, v in got.items()]) if isinstance(got, list): return [try_get_objects(v) for v in got] return got
[docs]def try_get_object(source: str or dict or list): if isinstance(source, str): try: # try python term '{key: "value"}' evaled = eval_datetime(source) if isinstance(evaled, ModuleType) or callable(evaled): # for standalone 'string' var or 'id' bif return source source = evaled except Exception: try: # try json object '{"key" : "value"}' source = json.loads(source) except ValueError: return source return source
[docs]def fill_template_recursive(source: Union[dict, list, str], variables: dict, glob=None, globs_added=None) \ -> Union[dict, list, str]: if isinstance(source, dict): return dict([(fill_template_recursive(k, variables, glob, globs_added), fill_template_recursive(v, variables, glob, globs_added)) for k, v in source.items()]) if isinstance(source, list): return [fill_template_recursive(v, variables, glob, globs_added) for v in source] return fill_template(source, variables, glob, globs_added)
[docs]def fill_template(source: str, variables: dict, isjson=False, glob=None, globs_added=None) -> str: if not globs_added: globs_added = set() if isinstance(source, str): source = render(source, inject_builtins(variables)) if isjson: # do not parse json string back to objects return source try: evaled = format_datetime(eval_datetime(source, glob)) if not isinstance(evaled, ModuleType) and not callable(evaled): # for standalone 'string' var or 'id' bif source = evaled except NameError as e: # try to import missing package and rerun this code if 'is not defined' in str(e): name = str(e).split("'")[1] if name not in globs_added: # f.e. tzinfo=psycopg2.tz.FixedOffsetTimezone for datetime glob = module_utils.add_package_to_globals(name, glob, warn_missing_package=False) globs_added.add(name) filled = fill_template(source, variables, isjson, glob=glob, globs_added=globs_added) if not isinstance(filled, ModuleType) and not callable(filled): return filled # for standalone 'string' var or 'id' bif except Exception: pass return source
[docs]def fill_template_str(source: any, variables: dict) -> str: rendered = render(str(source), inject_builtins(variables)) if rendered != source: return fill_template_str(rendered, variables) return rendered
[docs]def eval_datetime(astr, glob=None): if glob is None: glob = globals() try: tree = ast.parse(astr) except SyntaxError: raise ValueError(astr) for node in ast.walk(tree): if isinstance(node, (ast.Module, ast.Expr, ast.Dict, ast.Str, ast.Attribute, ast.Num, ast.Name, ast.Load, ast.Tuple)): continue if (isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) and node.func.attr == 'datetime'): continue pass return eval(astr, glob)
[docs]def format_datetime(iterable): if not isinstance(iterable, Iterable) or isinstance(iterable, str): if isinstance(iterable, datetime.datetime): return iterable.strftime('%Y-%m-%d %H:%M:%S.%f') return iterable else: if isinstance(iterable, dict): return dict([(format_datetime(k), format_datetime(v)) for k, v in iterable.items()]) elif isinstance(iterable, tuple): return tuple([format_datetime(i) for i in iterable]) return [format_datetime(i) for i in iterable]
[docs]def inject_builtins(variables: dict) -> dict: variables_copy = dict(variables) variables_copy['RANDOM_STR'] = str(uuid.uuid4()) variables_copy['RANDOM_INT'] = random.randint(-2147483648, 2147483648) ts = round(time.time(), 6) # from timestamp uses rounding, so we should also use it here, to make them compatible variables_copy['NOW_TS'] = ts variables_copy['NOW_DT'] = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%dT%H:%M:%S0+0000') return variables_copy
[docs]def render(source: str, variables: dict) -> str: template = Template(source) holder = FiltersFactory() for filter_mod, value in holder.filters.items(): template.environment.filters[filter_mod] = value for fun_mod, value in holder.functions.items(): template.globals[fun_mod] = value try: return template.render(variables) except UndefinedError as e: debug(e.message) return source