Source code for catcher.steps.loop

import collections
import itertools
import json

from catcher.steps.check import Operator
from catcher.steps.step import Step, update_variables
from catcher.utils.logger import debug
from catcher.utils.misc import fill_template_str, try_get_objects


[docs]class Loop(Step): """ Repeat one or several actions till the condition is true or for each element of the collection. Is useful, when you need to wait for some process to start or for async execution to finish. :Input: :while: perform action while the condition is true - if: your condition. It can be in short format: `if: '{{ counter < 10 }}'` and long one: `if: {equals: {the: '{{ counter }}', is_not: 10000}}`. The clause format is the same as in [checks](checks.md) - do: the aciton to be performed. Can be a list of actions or single one. - max_cycle: the limit of reductions. *Optional* default is no limit. :foreach: iterate data structure - in: variable or static list. **ITEM** variable can be used to access each element of the data structure. Data structure can be list, dict or any other python data structure which supports iteration. - do: the action to be performed. Can be a list of actions or single one. :Examples: Perform a single echo wile counter is less than 10 :: loop: while: if: '{{ counter < 10 }}' do: echo: {from: '{{ counter + 1 }}', register: {counter: '{{ OUTPUT }}'}} max_cycle: 100000 Perform to actions: consume message from kafka and send token via POST http. Do it until server returns passed true in http response. :: loop: while: if: equals: {the: '{{ passed }}', is_not: True} do: - kafka: consume: server: '127.0.0.1:9092' group_id: 'test' topic: 'test_consume_with_timestamp' timeout: {seconds: 5} where: equals: '{{ MESSAGE.timestamp > 1000 }}' register: {token: '{{ OUTPUT.data.token }}'} - http: post: headers: {Content-Type: 'application/json'} url: 'http://test.com/check_my_token' body: {'token': '{{ token }}'} register: {passed: '{{ OUTPUT.passed }}'} Iterate over `iterator` variable, produce each element to kafka as json and debug it to file. :: loop: foreach: in: '{{ iterator }}' do: - kafka: produce: server: '127.0.0.1:9092' topic: 'test_produce_json' data: '{{ ITEM|tojson }}' - echo: {from: '{{ ITEM }}', to: '{{ ITEM["filename"] }}.output'} Iterate over several different configurations. :: variables: db_1: 'test:test@localhost:5433/db1' db_2: url: 'test:test@localhost:5434/db2' type: postgres db_3: dbname: 'db3' user: 'test' password: 'test' host: 'localhost' port: 5435 type: 'postgres' steps: - loop: foreach: in: '["{{ db_1 }}", {{ db_2 }}, {{ db_3 }}]' do: - postgres: request: conf: '{{ ITEM }}' query: 'select count(*) from test' register: {documents: '{{ OUTPUT }}'} - check: equals: {the: '{{ documents.count }}', is: 2} Note that db_1 template has additional quotes ``"{{ db_1 }}"``. Your **in** should contain valid object. As db_1 is just a string - it should be put in quotes. Otherwise **in** value will be corrupted. Always make sure your in value is valid. It may also have some difficulties with json as string. """ def __init__(self, _get_action=None, _get_actions=None, **kwargs): super().__init__(**kwargs) self.type = Step.filter_predefined_keys(kwargs) # while/foreach do = kwargs[self.type]['do'] if len(do) == 1: # just action if isinstance(do, list): # list with single action do = do[0] [loop_action] = do.keys() self.do_action = [_get_action((loop_action, do[loop_action]))] else: self.do_action = list(itertools.chain.from_iterable([_get_actions(act) for act in do])) self.max_cycle = kwargs[self.type].get('max_cycle') if self.type == 'while': if_clause = kwargs['while']['if'] if isinstance(if_clause, str): self.if_clause = {'equals': if_clause} else: self.if_clause = if_clause elif self.type == 'foreach': self.in_var = kwargs['foreach']['in'] else: raise ValueError('Wrong configuration for step: ' + str(kwargs))
[docs] @update_variables def action(self, includes: dict, variables: dict) -> dict: output = variables if self.type == 'while': operator = Operator.find_operator(self.if_clause) while operator.operation(output): output = self.__run_actions(includes, output) if self.max_cycle is not None: if self.max_cycle == 0: break self.max_cycle = self.max_cycle - 1 return output elif self.type == 'foreach': loop_var = try_get_objects(fill_template_str(json.dumps(self.in_var), variables)) if not isinstance(loop_var, collections.Iterable): raise ValueError(str(loop_var) + ' is not iterable') for entry in loop_var: debug('Looping over {}'.format(entry)) output['ITEM'] = entry output = self.__run_actions(includes, output) return output
def __run_actions(self, includes, variables: dict) -> dict: output = variables for action in self.do_action: try: output = action.action(includes, output) except Exception as e: if action.ignore_errors: debug('{} got {} but we ignore it'.format(fill_template_str(action.name, variables), e)) break raise e return output