Source code for catcher.steps.wait
import itertools
import time
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from time import sleep
from catcher.steps.step import Step, update_variables
from catcher.utils.time_utils import to_seconds
from catcher.utils.logger import debug
[docs]class Wait(Step):
"""
Wait for a static delay or until some substep finished successfully. Is extremely useful for testing asnyc systems
or when you are waiting for some service to launch.
:Input:
:days: several days
:hours: several hours
:minutes: several minutes
:seconds: several seconds
:microseconds: several microseconds
:milliseconds: several milliseconds
:nanoseconds: several nanoseconds
:for: (list of actions) will repeat them till they all finishes successfully. Will fail if time ends.
:Examples:
Wait for 1 minute 30 seconds
::
wait: {minutes: 1, seconds: 30}
Wait for http to be ready. Will repeat inner **http** step till is succeeded or fails after 5 seconds
::
wait:
seconds: 5
for:
http:
put:
url: 'http://localhost:8000/mockserver/expectation'
body:
httpRequest: {'path': '/some/path'}
httpResponse: {'body': 'hello world'}
response_code: 201
Wait for postgres to be populated
::
wait:
seconds: 30
for:
- postgres:
request:
conf: '{{ pg_conf }}'
query: 'select count(*) from users'
register: {documents: '{{ OUTPUT }}'}
- check: {equals: {the: '{{ documents }}', is_not: 0}}
"""
def __init__(self, _get_action=None, _get_actions=None, **kwargs) -> None:
super().__init__(**kwargs)
self.delay = to_seconds(kwargs)
self._actions = None
if 'for' in kwargs:
wait_for = kwargs['for']
if len(wait_for) == 1:
if isinstance(wait_for, list):
wait_for = wait_for[0]
[loop_action] = wait_for.keys()
self._actions = [_get_action((loop_action, wait_for[loop_action]))]
elif len(wait_for) > 1:
self._actions = list(itertools.chain.from_iterable([_get_actions(act) for act in wait_for]))
[docs] @update_variables
def action(self, includes: dict, variables: dict) -> dict:
if self._actions is not None:
# run thread and either exit on success or wait fixed time
pool = ThreadPoolExecutor(max_workers=1)
future = pool.submit(self.run_loop, includes, variables)
try:
return future.result()
except TimeoutError:
return variables
else: # wait fixed time
sleep(self.delay)
return variables
[docs] def run_loop(self, includes, variables):
output = variables
repeat = True
start = time.time()
while repeat:
if time.time() > start + self.delay: # time limit reached
raise Exception('Time limit reach with no success from substeps')
loop_vars = deepcopy(output) # start every loop from the same variables
try:
for action in self._actions:
loop_vars = action.action(includes, loop_vars)
repeat = False
output = loop_vars # if loop was successful - return modified variables
except Exception as e:
debug('Wait step failure {}'.format(e))
return output