Skip to content

Instantly share code, notes, and snippets.

@foxmask
Created Jan 31, 2018
Embed
What would you like to do?
nameko with dynamic call of service
>>> n.rpc.service.for_who('service_a', 'foobar')
'i'm in read_data of <class 'test.ServiceA'> with payload foobar
error handling worker <WorkerContext [service.for_who] at 0x7f714960c5c0>: 'EventDispatcher' object is not callable
Traceback (most recent call last):
File "/home/foxmask/DjangoVirtualEnv/namekotst/lib/python3.6/site-packages/nameko/containers.py", line 388, in _run_worker
result = method(*worker_ctx.args, **worker_ctx.kwargs)
File "./test.py", line 28, in for_who
t.read_data(payload)
File "./test.py", line 40, in read_data
self.dispatch("save_data", payload)
TypeError: 'EventDispatcher' object is not callable
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/foxmask/DjangoVirtualEnv/namekotst/lib/python3.6/site-packages/nameko/rpc.py", line 374, in __call__
return reply.result()
File "/home/foxmask/DjangoVirtualEnv/namekotst/lib/python3.6/site-packages/nameko/rpc.py", line 332, in result
raise deserialize(error)
nameko.exceptions.RemoteError: TypeError 'EventDispatcher' object is not callable
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
class ServiceFactory:
def factory(service_name):
if service_name == "service_a":
return ServiceA()
if service_name == "service_b":
return ServiceB()
if service_name == "service_c":
return ServiceC()
assert 0, "Bad service_name : " + service_name
factory = staticmethod(factory)
class Service(ServiceFactory):
name = "service"
dispatch = EventDispatcher()
@rpc
def for_who(self, service_name, payload):
t = ServiceFactory.factory(service_name)
t.read_data(payload)
class ServiceA:
""" Event dispatching service. """
name = "service_a"
dispatch = EventDispatcher()
@rpc
def read_data(self, payload):
print("'i'm in read_data of {} with payload {}".format(self.__class__, payload))
self.dispatch("save_data", payload)
@event_handler("service", "save_data")
def handle_event(self, payload):
print("service A received:", payload)
class ServiceB:
""" Event listening service. """
name = "service_b"
@rpc
def read_data(self, payload):
self.dispatch("save_data", payload)
@event_handler("service", "save_data")
def handle_event(self, payload):
print("service b received:", payload)
class ServiceC:
""" Event dispatching service. """
name = "service_c"
dispatch = EventDispatcher()
@rpc
def read_data(self, payload):
self.dispatch("save_data", payload)
@event_handler("service", "save_data")
def handle_event(self, payload):
print("service C received:", payload)
@mattbennett
Copy link

mattbennett commented Feb 1, 2018

Nameko uses Python classes to describe services. Instantiating one is not sufficient to make it run as a service, so the object you're returning from your ServiceFactory are not services.

You could achieve your desired outcome by maintaining multiple RPC proxies:

class Service(ServiceFactory):

    name = "service"

    service_a_rpc = RpcProxy('a')
    service_b_rpc = RpcProxy('b')
    service_c_rpc = RpcProxy('c')

    @rpc
    def for_who(self, service_name, payload):
        proxy = getattr(self, f'{service_name}_rpc')
        proxy(payload)

Note that this does not dynamically start the downstream services.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment