Skip to content

Instantly share code, notes, and snippets.

@athoune
Last active Aug 29, 2015
Embed
What would you like to do?
Chronology monkeypatching for nameko
import timeit
import json
import eventlet
from nameko.rpc import Rpc, RpcProxy
q = eventlet.Queue()
def read_events(q):
with open('timing.log', 'w') as l:
while True:
evt = q.get()
print evt
json.dump(evt, l)
l.write('\n')
l.flush()
eventlet.spawn(read_events, q)
old_message = Rpc.handle_message
def patched_message(self, body, message):
worker_ctx_cls = self.container.worker_ctx_cls
context_data = self.unpack_message_headers(worker_ctx_cls, message)
q.put(('prems', body, context_data, timeit.default_timer()))
old_message(self, body, message)
Rpc.handle_message = patched_message
old_result = Rpc.handle_result
def patched_result(self, message, worker_ctx, result, exc_info):
r = old_result(self, message, worker_ctx, result, exc_info)
q.put(('deuz', worker_ctx.call_id_stack, timeit.default_timer()))
return r
Rpc.handle_result = patched_result
rpc = Rpc.decorator
class MisterService(object):
name = "mister_service"
@rpc
def mister(self, name):
return "Mr {}".format(name)
class GreetingService(object):
name = "greeting_service"
mister = RpcProxy("mister_service")
@rpc
def hello(self, name):
return "Hello, {}!".format(self.mister.mister(name))
@mattbennett
Copy link

A nicer way to achieve this is to use a service dependency. No need for monkeypatching:

import json
import timeit

import eventlet
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc, RpcProxy


q = eventlet.Queue()


def read_events(q):
    with open('better.log', 'w') as l:
        while True:
            evt = q.get()
            print evt
            json.dump(evt, l)
            l.write('\n')
            l.flush()

eventlet.spawn(read_events, q)


class Chronology(DependencyProvider):

    def worker_setup(self, worker_ctx):
        params = {'args': worker_ctx.args, 'kwargs': worker_ctx.kwargs}
        q.put(('prems', params, worker_ctx.context_data, timeit.default_timer()))

    def worker_result(self, worker_ctx, res=None, exc_info=None):
        q.put(('deuz', worker_ctx.call_id_stack, timeit.default_timer()))


class MisterService(object):
    name = "mister_service"
    chronology = Chronology()

    @rpc
    def mister(self, name):
        return "Mr {}".format(name)


class GreetingService(object):
    name = "greeting_service"
    chronology = Chronology()

    mister = RpcProxy("mister_service")

    @rpc
    def hello(self, name):
        return "Hello, {}!".format(self.mister.mister(name))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment