Skip to content

Instantly share code, notes, and snippets.

@minmax
Created March 22, 2012 17:53
Show Gist options
  • Save minmax/2160998 to your computer and use it in GitHub Desktop.
Save minmax/2160998 to your computer and use it in GitHub Desktop.
bug with xreq\xrep in txzmq
from twisted.internet import reactor
import zmq
from txzmq import ZmqEndpoint, ZmqFactory, ZmqConnection, ZmqEndpointType
REQUEST_COUNT = 10000
EVENTS_COUNT = 20000 - 2
class NewConnection(ZmqConnection):
socketType = zmq.XREQ
identity = 'master'
events_count = 0
requests_count = 0
last_event_index = None
def messageReceived(self, message):
message = message[0]
if message == 'event':
self.events_count += 1
if self.events_count == self.last_event_index:
reactor.callLater(0, reactor.stop)
else:
self.requests_count += 1
if message.startswith('exit'):
self.last_event_index = int(message.split('_')[1])
def send(self, message):
super(NewConnection, self).send(message)
# UNCOMMENT TO FIX A BUG
#reactor.callLater(0, self.doRead)
zmq_factory = ZmqFactory()
address = "tcp://%s:%s" % ('127.0.0.1', 7895)
endpoint = ZmqEndpoint(ZmqEndpointType.connect, address)
def main():
connection = NewConnection(zmq_factory, endpoint, NewConnection.identity)
reactor.callWhenRunning(initialize, connection)
try:
reactor.run()
finally:
if connection.events_count != EVENTS_COUNT:
print 'drop', EVENTS_COUNT - connection.events_count, 'events'
if connection.requests_count != REQUEST_COUNT:
print 'drop', REQUEST_COUNT - connection.requests_count, 'responses'
def initialize(connection):
print 'started'
zmq_factory.registerForShutdown()
schedule_sending_requests(connection)
def schedule_sending_requests(connection):
for index in xrange(REQUEST_COUNT):
reactor.callLater(0, send_request, connection, index)
def send_request(connection, index):
if index == REQUEST_COUNT - 1:
msg = 'stop'
else:
msg = 'req'
connection.send(msg)
if __name__ == '__main__':
main()
from twisted.internet import reactor
import zmq
from txzmq import ZmqEndpoint, ZmqFactory, ZmqConnection, ZmqEndpointType
zmq_factory = ZmqFactory()
address = "tcp://%s:%s" % ('127.0.0.1', 7895)
endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)
class NewConnection(ZmqConnection):
socketType = zmq.XREP
identity = 'slave'
scheduled_events_count = 0
def messageReceived(self, message):
if message[1] == 'stop':
self.send([message[0], 'exit_'+str(self.scheduled_events_count)])
reactor.callLater(0, reactor.stop)
return
self.send(message)
for i in xrange(2):
self.scheduled_events_count += 1
reactor.callLater(0, self.send_event)
def send_event(self):
self.send(['master', 'event'])
def main():
connection = NewConnection(zmq_factory, endpoint, NewConnection.identity)
reactor.callWhenRunning(initialize)
reactor.run()
def initialize():
print 'started'
zmq_factory.registerForShutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment