Created
December 21, 2013 07:14
-
-
Save flyer103/8066399 to your computer and use it in GitHub Desktop.
An modification of [inter-broker-routing example](http://zguide.zeromq.org/page:all#Worked-Example-Inter-Broker-Routing)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
# -*- coding: utf-8 -*- | |
""" | |
""" | |
import sys | |
import time | |
import random | |
import logging | |
import threading | |
import zmq | |
logging.basicConfig(level=logging.DEBUG, | |
format='[%(asctime)s][%(levelname)s] %(message)s', | |
datefmt='%H:%M:%S') | |
NUM_CLIENTS=10 | |
NUM_WORKERS=3 | |
class Client(threading.Thread): | |
"""Req socket. | |
Send tasks to `localfe` socket and send reply received to monitor socket. | |
""" | |
def __init__(self, name, num): | |
super(Client, self).__init__() | |
self.ctx = zmq.Context() | |
self.identity = 'Client-%s-%s' % (name, num) | |
self.client = self.ctx.socket(zmq.REQ) | |
self.client.setsockopt(zmq.IDENTITY, self.identity) | |
self.client.connect('ipc://%s-localfe.ipc' % (name,)) | |
self.monitor = self.ctx.socket(zmq.PUSH) | |
self.monitor.connect('ipc://%s-monitor.ipc' % (name,)) | |
self.poller = zmq.Poller() | |
self.poller.register(self.client, zmq.POLLIN) | |
def run(self): | |
logging.info('%s begins to work' % (self.identity,)) | |
while True: | |
time.sleep(random.randint(0, 5)) | |
req = '%04X' % (random.randint(0, 10000)) | |
self.client.send(req) | |
try: | |
poll_socks = dict(self.poller.poll()) | |
except zmq.ZMQError: | |
return | |
if poll_socks.get(self.client) == zmq.POLLIN: | |
rep = self.client.recv() | |
assert rep == req, \ | |
'%s expected %s, got %s' % (self.identity, req, rep) | |
self.monitor.send(rep) | |
else: | |
self.monitor.send('E: CLIENT %s lost reply' % (self.identity,)) | |
class Worker(threading.Thread): | |
"""REQ socket. | |
Get tasks from localbe socket and then send results to it. | |
""" | |
def __init__(self, name, num): | |
super(Worker, self).__init__() | |
self.ctx = zmq.Context() | |
self.identity = 'Worker-%s-%s' % (name, num) | |
self.worker = self.ctx.socket(zmq.REQ) | |
self.worker.setsockopt(zmq.IDENTITY, self.identity) | |
self.worker.connect('ipc://%s-localbe.ipc' % (name,)) | |
def run(self): | |
logging.info('%s begins to work' % (self.identity,)) | |
self.worker.send('READY') | |
while True: | |
try: | |
task = self.worker.recv_multipart() | |
except zmq.ZMQError: | |
return | |
logging.debug('%s get work %s' % (self.identity, task)) | |
self.worker.send_multipart(task) | |
class Proxy(threading.Thread): | |
"""Proxy server.""" | |
def __init__(self, myself, peers): | |
super(Proxy, self).__init__() | |
self.myself = myself | |
self.peers = peers | |
self.id_cloud = 'Cloud-%s' % (self.myself,) | |
self.id_peers = ['Cloud-%s' % (peer,) for peer in self.peers] | |
self.ctx = zmq.Context() | |
self.localfe = self.ctx.socket(zmq.ROUTER) | |
self.localfe.bind('ipc://%s-localfe.ipc' % (self.myself,)) | |
self.localbe = self.ctx.socket(zmq.ROUTER) | |
self.localbe.bind('ipc://%s-localbe.ipc' % (self.myself,)) | |
self.cloudfe = self.ctx.socket(zmq.ROUTER) | |
self.cloudfe.setsockopt(zmq.IDENTITY, self.id_cloud) | |
self.cloudfe.bind('ipc://%s-cloud.ipc' % (self.myself,)) | |
self.statebe = self.ctx.socket(zmq.PUB) | |
self.statebe.bind('ipc://%s-state.ipc' % (self.myself,)) | |
self.monitor = self.ctx.socket(zmq.PULL) | |
self.monitor.bind('ipc://%s-monitor.ipc' % (self.myself,)) | |
self.cloudbe = self.ctx.socket(zmq.ROUTER) | |
self.cloudbe.setsockopt(zmq.IDENTITY, self.id_cloud) | |
self.statefe = self.ctx.socket(zmq.SUB) | |
self.statefe.setsockopt(zmq.SUBSCRIBE, '') | |
for peer in self.peers: | |
self.cloudbe.connect('ipc://%s-cloud.ipc' % (peer,)) | |
self.statefe.connect('ipc://%s-state.ipc' % (peer,)) | |
for i in xrange(NUM_WORKERS): | |
worker = Worker(self.myself, i) | |
worker.damemon = True | |
worker.start() | |
for i in xrange(NUM_CLIENTS): | |
client = Client(self.myself, i) | |
client.daemon = True | |
client.start() | |
self.pollerbe = zmq.Poller() | |
self.pollerbe.register(self.localbe, zmq.POLLIN) | |
self.pollerbe.register(self.cloudbe, zmq.POLLIN) | |
self.pollerbe.register(self.statefe, zmq.POLLIN) | |
self.pollerbe.register(self.monitor, zmq.POLLIN) | |
self.local_capacity = 0 | |
self.cloud_capacity = 0 | |
self.workers_list = [] | |
def run(self): | |
raw_input('Press ENTER to start...') | |
while True: | |
try: | |
poll_socks = dict(self.pollerbe.poll(1000 if self.workers_list else None)) | |
except zmq.ZMQError: | |
break | |
previous = self.local_capacity | |
msg = None | |
if poll_socks.get(self.localbe) == zmq.POLLIN: | |
msg = self.localbe.recv_multipart() | |
(addr_worker, empty), msg = msg[:2], msg[2:] | |
self.workers_list.append(addr_worker) | |
self.local_capacity += 1 | |
if msg[-1] == 'READY': | |
msg = None | |
if poll_socks.get(self.cloudbe) == zmq.POLLIN: | |
msg = self.cloudbe.recv_multipart() | |
(addr_cloud, empty), msg = msg[:2], msg[2:] | |
if msg is not None: | |
addr = msg[0] | |
if addr in self.id_peers: | |
self.cloudfe.send_multipart(msg) | |
else: | |
self.localfe.send_multipart(msg) | |
if poll_socks.get(self.statefe) == zmq.POLLIN: | |
peers, capacity = self.statefe.recv_multipart() | |
self.cloud_capacity = int(capacity) | |
if poll_socks.get(self.monitor) == zmq.POLLIN: | |
logging.info('Msg from client: %s' % (self.monitor.recv(),)) | |
while self.local_capacity + self.cloud_capacity: | |
pollerfe = zmq.Poller() | |
pollerfe.register(self.localfe, zmq.POLLIN) | |
if self.local_capacity: | |
pollerfe.register(self.cloudfe, zmq.POLLIN) | |
poll_socks = dict(pollerfe.poll(0)) | |
if poll_socks.get(self.cloudfe) == zmq.POLLIN: | |
msg = self.cloudfe.recv_multipart() | |
logging.debug('get task from cloud..................') | |
elif poll_socks.get(self.localfe) == zmq.POLLIN: | |
msg = self.localfe.recv_multipart() | |
else: | |
break | |
if self.local_capacity: | |
msg = [self.workers_list.pop(), ''] + msg | |
self.localbe.send_multipart(msg) | |
self.local_capacity -= 1 | |
else: | |
msg = [random.choice(self.id_peers), ''] + msg | |
self.cloudbe.send_multipart(msg) | |
logging.debug('Send task %s to CLOUD' % (msg,)) | |
if previous != self.local_capacity: | |
self.statebe.send_multipart([self.myself, '%s' % (self.local_capacity)]) | |
if __name__ == '__main__': | |
if len(sys.argv) < 2: | |
print('Usage: peering3.py <myself> [<peer_0> [... <peer_N>]]') | |
sys.exit(1) | |
else: | |
proxy = Proxy(myself=sys.argv[1], peers=sys.argv[2:]) | |
proxy.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment