Skip to content

Instantly share code, notes, and snippets.

@devdave
Last active December 13, 2015 23:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devdave/4991730 to your computer and use it in GitHub Desktop.
Save devdave/4991730 to your computer and use it in GitHub Desktop.
Messing around with different idea's and hacked together a crude twisted inlineCallbacks decorator.
import time
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
from dgbase.messages.kickstart import RequestKickstart
from dgbase.messages.kickstart import RespondWithKickstart
from functools import wraps, partial
LOOP = ioloop.IOLoop.instance()
class Request(object):
def __init__(self, socket, msg, timeout):
self.socket = socket
self.msg = msg
self.timeout = timeout
def __del__(self):
print "Request.__del__"
def execute(self, stream, handler):
self.timeout = ioloop.DelayedCallback(handler.on_timeout, self.timeout)
stream.send_multipart(['requestkickstart', self.msg])
stream.flush()
self.timeout.start()
class ZMQInline(object):
def __init__(self):
self.loop = ioloop.IOLoop.instance()
self.streams = {}
self.generator = None
self.last_request = None
def __del__(self):
print "__del__ zmqinline"
def __call__(self, f):
@wraps(f)
def decorator(*args, **kwargs):
self.unwind(None, f(*args, **kwargs))
return decorator
def add_stream(self, socket):
if socket not in self.streams:
self.streams[socket] = ZMQStream(socket, self.loop)
self.streams[socket].on_recv_stream(self.on_recv)
return self.streams[socket]
def unwind(self, result, generator = None):
self.generator = generator or self.generator
try:
request = self.generator.send(result)
except StopIteration:
return
else:
self.last_request = request
stream = self.add_stream(request.socket)
request.execute(stream, self)
def on_recv(self, *args, **kwargs):
result = args, kwargs
self.unwind(result, self.generator)
def on_timeout(self, *args, **kwargs):
result = args, kwargs
print result
self.unwind(result, self.generator)
class Client(object):
def __init__(self):
self.ctx = zmq.Context()
self.loop = ioloop.IOLoop.instance()
self.control = self.ctx.socket(zmq.REQ)
self.control.connect("tcp://192.168.1.2:8283")
@ZMQInline()
def do_kickstart(self):
print "do_kickstart"
try:
response = yield Request(self.control, 'hello', timeout = 5000)
except Exception as e:
print e
else:
print "Got ", response
try:
response = yield Request(self.control, 'hello2', timeout = 5000)
except Exception as e:
print e
else:
print "Got ", response
self.loop.stop()
def check_point(self):
dbgp = 1
test = dbgp
dbgp = test
print "tick"
self.loop.add_timeout(time.time() + 5, self.check_point)
def start(self):
self.loop.add_timeout(time.time() + 5, self.check_point)
self.loop.add_callback(self.do_kickstart)
self.loop.start()
c = Client()
c.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment