Skip to content

Instantly share code, notes, and snippets.

@whardier
Created November 9, 2012 23:24
Show Gist options
  • Save whardier/4048989 to your computer and use it in GitHub Desktop.
Save whardier/4048989 to your computer and use it in GitHub Desktop.
Test Server and Worker using InetD initialization.
30000 stream tcp nowait sspencer /home/sspencer/testworker.py test 2> /tmp/wtf
import zmq
import uuid
import socket
import time
import simplejson
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
context = zmq.Context(1)
lastping = time.time()
pusher = context.socket(zmq.PUSH)
puller = context.socket(zmq.PULL)
publisher = context.socket(zmq.PUB)
pusher_port = pusher.bind_to_random_port("tcp://*")
puller_port = puller.bind_to_random_port("tcp://*")
publisher_port = publisher.bind_to_random_port("tcp://*")
pusher_stream = ZMQStream(pusher)
puller_stream = ZMQStream(puller)
publisher_stream = ZMQStream(publisher)
def process(msg, stream):
print 'process', msg, stream
def standard(msg, stream):
print 'standard', msg, stream
puller_stream.on_recv_stream(process)
publisher_stream.on_recv_stream(process)
pool = []
for i in range(10):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
id = str(uuid.uuid1())
pool.append(id)
s.connect(('localhost', 30000))
s.send(simplejson.dumps({'pusher': pusher_port, 'puller': puller_port, 'publisher': publisher_port, 'uuid': str(uuid.uuid1())}) + '\r\n')
print s.recv(1024)
s.close()
def ping():
publisher_stream.send_json({'command': 'ping'})
print pool
print 'pinging'
pinger = ioloop.PeriodicCallback(ping, 1000)
pinger.start()
ioloop.IOLoop.instance().start()
#!/home/sspencer/VirtualEnv/zerodivision/bin/python
import os
import sys
import socket
import time
import logging
import logging.handlers
import zmq
import simplejson
logger = logging.getLogger("test")
logger.setLevel(logging.DEBUG)
handler = logging.handlers.SysLogHandler(address = '/dev/log', facility=logging.handlers.SysLogHandler.LOG_DAEMON)
formatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.debug('hi')
params = simplejson.loads(sys.stdin.readline())
context = zmq.Context()
puller = context.socket(zmq.PULL)
puller.connect("tcp://localhost:%d" % params['pusher'])
pusher = context.socket(zmq.PUSH)
pusher.connect("tcp://localhost:%d" % params['puller'])
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:%d" % params['publisher'])
subscriber.setsockopt(zmq.SUBSCRIBE,'')
pusher.send_json({'ack': params})
sys.stdout.write(simplejson.dumps({'ack': params}))
sys.stdout.flush()
poller = zmq.Poller()
poller.register(puller, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
lastping = time.time()
while True:
socks = dict(poller.poll(500))
logger.debug(repr(socks))
ping = time.time()
if socks:
lastping = ping
if puller in socks and socks[puller] == zmq.POLLIN:
message = puller.recv_json()
logger.debug(message)
if subscriber in socks and socks[subscriber] == zmq.POLLIN:
message = subscriber.recv_json()
if message['command'] == 'ping':
pusher.send_json({'command': 'pong', 'last': lastping, 'uuid': params['uuid']})
logger.debug(message)
if ping > lastping + 10:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment