Skip to content

Instantly share code, notes, and snippets.

@jself
Created November 23, 2010 23:22
Show Gist options
  • Save jself/712759 to your computer and use it in GitHub Desktop.
Save jself/712759 to your computer and use it in GitHub Desktop.
import zmq
import threading
import uuid
import time
import itertools
from homingbird import Node, LocalNode
def get_random():
return str(uuid.uuid4()).replace('-','')
class Pool(Node):
def work_f(self, m):
data = m.data
m.reply({
'result':data['f'](data['data']),
'key':data['key'],
'type':'response',
'worker_id':data['worker_id']
})
def __init__(self, workers=20, workf=None, bind=None, daemon=True, spawn=True, **kwargs):
self.workers = []
workf = workf or self.work_f
for i in range(workers):
self.workers.append(Node(workf))
Node._context = Node._context or zmq.Context()
self.f = f
self.id = bind or 'inproc://homingbird-' + str(hash(self))
if kwargs.get('connect_socket', True):
self.connect_socket()
if spawn:
t = threading.Thread(None, self.main)
t.daemon = daemon
t.start()
def queue(self, f, keys, datas):
for k, d in itertools.izip(keys, datas):
self.send(self, {'type':'job', 'f':f, 'key':k, 'data':d})
def get(self, key, timeout=None):
l = LocalNode()
l.send(self, {'type': 'get', 'key':key})
result = l.receive(timeout)
return result.data if result else result
def get_with_requeue(self, f, key, data, timeout=3, tries=0):
result = self.get(key, timeout)
if result:
return result
else:
if tries == 1:
return None
tries -= 1
self.queue(f, [key], [data])
return self.get_with_requeue(f, key, data, timeout, tries - 1)
def main(self):
results = {}
current_worker = 0
waiting_results = {}
while 1:
m = self.receive(1)
if not m: continue
if m.name == 'Exit':
self.report(m)
break
elif m.name == 'Message':
d = m.data
if d['type'] == 'job':
d['worker_id'] = current_worker
self.send(self.workers[current_worker], d)
current_worker = 0 if current_worker == len(self.workers) - 1 else current_worker + 1
elif d['type'] == 'response':
if d['key'] in waiting_results:
waiting_results.pop(d['key']).reply(d['result'])
else:
results[d['key']] = d['result']
elif d['type'] == 'get':
if d['key'] in results:
m.reply(results.pop(d['key']))
else:
waiting_results['key'] = m
def exit(self):
for worker in self.workers:
worker.exit()
Node.exit(self)
def f(d):
time.sleep(5)
return d + 1
if __name__ == '__main__':
p = Pool()
p.queue(f, range(1,100), range(1,100))
for i in range(1,100):
print p.get_with_requeue(f, i, i, 5, tries=5)
p.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment