Source code for catcher.steps.grpc_step
from os.path import dirname, join
from catcher.steps.step import Step, update_variables
from catcher.utils import module_utils
from catcher.utils import file_utils
from catcher.utils.misc import fill_template, fill_template_str, try_get_objects
[docs]class GRPC(Step):
"""
Perform a remote procedure call with protobuffers layer.
:Input:
:call: Make a remote procedure call
- url: server url
- function: service and method you are going to call separated by dot. Case insensitive (MyClass.my_function)
- schema: path to the .proto resource file. *Optional* Ignore it if reflection is configured on the
server side
- data: data to pass. *Optional*
:Examples:
calculator.proto
::
message Number {
float value = 1;
}
service Calculator {
rpc SquareRoot(Number) returns (Number) {}
}
test
::
grpc:
call:
url: 'localhost:50051'
function: calculator.squareroot
schema: 'calculator.proto'
data: {'value': 2}
register: {'my_value': '{{ OUTPUT.value }}'
Complex schema case::
grpc:
call:
url: 'localhost:50051'
function: greeter.greet
schema: 'greeter.proto'
data:
result:
url: '{{ my_url }}'
title: 'test'
snippets: 'test2'
register: {value: '{{ OUTPUT.name }}'}
Useful tip: if you'd like to use templates in your .proto file - do not do it in the original resources, as Catcher
shouldn't modify them. Use echo step to fill a template and create another .proto file for you.
"""
def __init__(self, call=None, **kwargs) -> None:
super().__init__(**kwargs)
if call:
[service, method] = call['function'].split('.')
self.service = service.lower()
self.url = call['url']
self.method = method.lower()
self.schema = call.get('schema')
self.data = call.get('data', {})
[docs] @update_variables
def action(self, includes: dict, variables: dict) -> dict or tuple:
import grpc
channel = grpc.insecure_channel(fill_template(self.url, variables))
if self.schema:
self._compile_proto_files(variables)
client = self._open_channel(channel, variables)
method, input_arg = self._compose_arg(variables)
return variables, getattr(client, method)(input_arg)
else:
raise Exception('Reflection not supported (yet)')
def _compile_proto_files(self, variables):
"""
Compile .proto resource into the definition (_pb2) and client code (_pb2_grpc)
"""
from grpc.tools import command
schema = fill_template(self.schema, variables)
command.build_package_protos(join(variables['RESOURCES_DIR'], dirname(schema)), strict_mode=True)
def _open_channel(self, channel, variables):
"""
Search for stub in generated module _pb2_grpc. Instantiate it with channel and return.
"""
schema = fill_template(self.schema, variables)
mod = module_utils.load_external_actions(join(variables['RESOURCES_DIR'],
file_utils.get_filename(schema) + '_pb2_grpc.py'))
classes = module_utils.get_all_classes(mod)
classes = {k.lower(): v for k, v in classes.items()}
stub = classes.get(fill_template(self.service, variables) + 'stub')
if not stub:
raise Exception('Can\'t find stub in generated code. Something went wrong')
return stub(channel)
def _compose_arg(self, variables):
"""
Search for the method in generated _pb2. Resolve input type by the method's input.
"""
schema = fill_template(self.schema, variables)
service = fill_template(self.service, variables)
method = fill_template(self.method, variables)
# search for the service
mod = module_utils.load_external_actions(join(variables['RESOURCES_DIR'],
file_utils.get_filename(schema) + '_pb2.py'))
services = {str(k).lower(): v for k, v in mod.DESCRIPTOR.services_by_name.items()}
service = services.get(service)
if not service:
raise Exception('Unable to find service {} in {}'.format(service,
file_utils.get_filename(schema) + '_pb2'))
# find service's method
methods = dict([(f.name.lower(), f) for f in service.methods])
method = methods.get(method)
if not method:
raise Exception('No method {} in service {}'.format(method, service))
# find method's input type
input_type = method.input_type
classes = module_utils.get_all_classes(mod)
data = try_get_objects(fill_template_str(self.data, variables))
return method.name, classes[input_type.name](**data)