Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
strip amqp dependencies from nameko class
import inspect
from nameko.extensions import ENTRYPOINT_EXTENSIONS_ATTR
from nameko.rpc import rpc, RpcProxy, Rpc
from nameko.web.handlers import http
import requests
class HelloService(object):
name = "greeting_service"
foo = RpcProxy('other')
@rpc
def hello(self, name):
return ''
@http('GET', '/')
def get(self, request):
return 'ok'
def strip_rpc(cls):
def is_rpc_method(attr):
entrypoints = getattr(attr, ENTRYPOINT_EXTENSIONS_ATTR, ())
for entrypoint in entrypoints:
if isinstance(entrypoint, Rpc):
return True
def is_rpc_proxy(attr):
return isinstance(attr, RpcProxy)
def should_strip(attr):
return is_rpc_method(attr) or is_rpc_proxy(attr)
attrs = {}
for name, _ in inspect.getmembers(cls, should_strip):
attrs[name] = None
return type(cls.__name__ + 'Clone', (cls,), attrs)
def test_foo(container_factory):
StrippedService = strip_rpc(HelloService)
container = container_factory(StrippedService, {})
container.start()
response = requests.get('http://localhost:8000')
assert response.text == 'ok'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment