Skip to content

Instantly share code, notes, and snippets.

@flyer103
Created December 21, 2013 07:14
Show Gist options
  • Save flyer103/8066399 to your computer and use it in GitHub Desktop.
Save flyer103/8066399 to your computer and use it in GitHub Desktop.
An modification of [inter-broker-routing example](http://zguide.zeromq.org/page:all#Worked-Example-Inter-Broker-Routing)
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
"""
"""
import sys
import time
import random
import logging
import threading
import zmq
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s][%(levelname)s] %(message)s',
datefmt='%H:%M:%S')
NUM_CLIENTS=10
NUM_WORKERS=3
class Client(threading.Thread):
"""Req socket.
Send tasks to `localfe` socket and send reply received to monitor socket.
"""
def __init__(self, name, num):
super(Client, self).__init__()
self.ctx = zmq.Context()
self.identity = 'Client-%s-%s' % (name, num)
self.client = self.ctx.socket(zmq.REQ)
self.client.setsockopt(zmq.IDENTITY, self.identity)
self.client.connect('ipc://%s-localfe.ipc' % (name,))
self.monitor = self.ctx.socket(zmq.PUSH)
self.monitor.connect('ipc://%s-monitor.ipc' % (name,))
self.poller = zmq.Poller()
self.poller.register(self.client, zmq.POLLIN)
def run(self):
logging.info('%s begins to work' % (self.identity,))
while True:
time.sleep(random.randint(0, 5))
req = '%04X' % (random.randint(0, 10000))
self.client.send(req)
try:
poll_socks = dict(self.poller.poll())
except zmq.ZMQError:
return
if poll_socks.get(self.client) == zmq.POLLIN:
rep = self.client.recv()
assert rep == req, \
'%s expected %s, got %s' % (self.identity, req, rep)
self.monitor.send(rep)
else:
self.monitor.send('E: CLIENT %s lost reply' % (self.identity,))
class Worker(threading.Thread):
"""REQ socket.
Get tasks from localbe socket and then send results to it.
"""
def __init__(self, name, num):
super(Worker, self).__init__()
self.ctx = zmq.Context()
self.identity = 'Worker-%s-%s' % (name, num)
self.worker = self.ctx.socket(zmq.REQ)
self.worker.setsockopt(zmq.IDENTITY, self.identity)
self.worker.connect('ipc://%s-localbe.ipc' % (name,))
def run(self):
logging.info('%s begins to work' % (self.identity,))
self.worker.send('READY')
while True:
try:
task = self.worker.recv_multipart()
except zmq.ZMQError:
return
logging.debug('%s get work %s' % (self.identity, task))
self.worker.send_multipart(task)
class Proxy(threading.Thread):
"""Proxy server."""
def __init__(self, myself, peers):
super(Proxy, self).__init__()
self.myself = myself
self.peers = peers
self.id_cloud = 'Cloud-%s' % (self.myself,)
self.id_peers = ['Cloud-%s' % (peer,) for peer in self.peers]
self.ctx = zmq.Context()
self.localfe = self.ctx.socket(zmq.ROUTER)
self.localfe.bind('ipc://%s-localfe.ipc' % (self.myself,))
self.localbe = self.ctx.socket(zmq.ROUTER)
self.localbe.bind('ipc://%s-localbe.ipc' % (self.myself,))
self.cloudfe = self.ctx.socket(zmq.ROUTER)
self.cloudfe.setsockopt(zmq.IDENTITY, self.id_cloud)
self.cloudfe.bind('ipc://%s-cloud.ipc' % (self.myself,))
self.statebe = self.ctx.socket(zmq.PUB)
self.statebe.bind('ipc://%s-state.ipc' % (self.myself,))
self.monitor = self.ctx.socket(zmq.PULL)
self.monitor.bind('ipc://%s-monitor.ipc' % (self.myself,))
self.cloudbe = self.ctx.socket(zmq.ROUTER)
self.cloudbe.setsockopt(zmq.IDENTITY, self.id_cloud)
self.statefe = self.ctx.socket(zmq.SUB)
self.statefe.setsockopt(zmq.SUBSCRIBE, '')
for peer in self.peers:
self.cloudbe.connect('ipc://%s-cloud.ipc' % (peer,))
self.statefe.connect('ipc://%s-state.ipc' % (peer,))
for i in xrange(NUM_WORKERS):
worker = Worker(self.myself, i)
worker.damemon = True
worker.start()
for i in xrange(NUM_CLIENTS):
client = Client(self.myself, i)
client.daemon = True
client.start()
self.pollerbe = zmq.Poller()
self.pollerbe.register(self.localbe, zmq.POLLIN)
self.pollerbe.register(self.cloudbe, zmq.POLLIN)
self.pollerbe.register(self.statefe, zmq.POLLIN)
self.pollerbe.register(self.monitor, zmq.POLLIN)
self.local_capacity = 0
self.cloud_capacity = 0
self.workers_list = []
def run(self):
raw_input('Press ENTER to start...')
while True:
try:
poll_socks = dict(self.pollerbe.poll(1000 if self.workers_list else None))
except zmq.ZMQError:
break
previous = self.local_capacity
msg = None
if poll_socks.get(self.localbe) == zmq.POLLIN:
msg = self.localbe.recv_multipart()
(addr_worker, empty), msg = msg[:2], msg[2:]
self.workers_list.append(addr_worker)
self.local_capacity += 1
if msg[-1] == 'READY':
msg = None
if poll_socks.get(self.cloudbe) == zmq.POLLIN:
msg = self.cloudbe.recv_multipart()
(addr_cloud, empty), msg = msg[:2], msg[2:]
if msg is not None:
addr = msg[0]
if addr in self.id_peers:
self.cloudfe.send_multipart(msg)
else:
self.localfe.send_multipart(msg)
if poll_socks.get(self.statefe) == zmq.POLLIN:
peers, capacity = self.statefe.recv_multipart()
self.cloud_capacity = int(capacity)
if poll_socks.get(self.monitor) == zmq.POLLIN:
logging.info('Msg from client: %s' % (self.monitor.recv(),))
while self.local_capacity + self.cloud_capacity:
pollerfe = zmq.Poller()
pollerfe.register(self.localfe, zmq.POLLIN)
if self.local_capacity:
pollerfe.register(self.cloudfe, zmq.POLLIN)
poll_socks = dict(pollerfe.poll(0))
if poll_socks.get(self.cloudfe) == zmq.POLLIN:
msg = self.cloudfe.recv_multipart()
logging.debug('get task from cloud..................')
elif poll_socks.get(self.localfe) == zmq.POLLIN:
msg = self.localfe.recv_multipart()
else:
break
if self.local_capacity:
msg = [self.workers_list.pop(), ''] + msg
self.localbe.send_multipart(msg)
self.local_capacity -= 1
else:
msg = [random.choice(self.id_peers), ''] + msg
self.cloudbe.send_multipart(msg)
logging.debug('Send task %s to CLOUD' % (msg,))
if previous != self.local_capacity:
self.statebe.send_multipart([self.myself, '%s' % (self.local_capacity)])
if __name__ == '__main__':
if len(sys.argv) < 2:
print('Usage: peering3.py <myself> [<peer_0> [... <peer_N>]]')
sys.exit(1)
else:
proxy = Proxy(myself=sys.argv[1], peers=sys.argv[2:])
proxy.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment