Skip to content

Instantly share code, notes, and snippets.

@minrk
Created May 25, 2012 20:16
Show Gist options
  • Save minrk/2790332 to your computer and use it in GitHub Desktop.
Save minrk/2790332 to your computer and use it in GitHub Desktop.
Edited version of example script for farming out image saves via zmq ventilator
import numpy
import zmq
def send_array(socket, A, flags=0, copy=True, track=False):
"""send a numpy array with metadata"""
md = dict(
dtype = str(A.dtype),
shape = A.shape,
)
socket.send_json(md, flags|zmq.SNDMORE)
return socket.send(A, flags, copy=copy, track=track)
def recv_array(socket, flags=0, copy=True, track=False):
"""recv a numpy array"""
md = socket.recv_json(flags=flags)
msg = socket.recv(flags=flags, copy=copy, track=track)
buf = buffer(msg)
A = numpy.frombuffer(buf, dtype=md['dtype'])
return A.reshape(md['shape'])
###############
#
# (c) Vasco Tenner 2012
#
# Creates class to write images in parallel
#
##############
import zmq
import random
import time
import sys
import numpy as np
# local
from npzmq import send_array
from worker import run_worker
class ParallelWrite:
""" Write images parallel to capture of images """
def __init__(self, nworkers=0, nmessages=1000):
self.nworkers = nworkers
self.num = nmessages
self.numtowrite = 0
context = zmq.Context()
# Socket to send messages on
self.toworker = context.socket(zmq.PUSH)
self.toworker.hwm = 100
self.toworker.bind("tcp://*:5557")
# Socket with direct access to the sink:
#used to syncronize start of batch
self.tosink = context.socket(zmq.PUSH)
self.tosink.connect("tcp://localhost:5558")
# Socket to receive messages on
self.sink = context.socket(zmq.PULL)
self.sink.bind("tcp://*:5558")
# Socket for worker control
self.controller = context.socket(zmq.PUB)
self.controller.bind("tcp://*:5559")
def cleanup(self):
lastnumtowrite = self.numtowrite+1
while self.numtowrite > 0:
if not self.numtowrite == lastnumtowrite:
print 'Waiting for %i files to be written: ' % self.numtowrite
lastnumtowrite = self.numtowrite.copy()
time.sleep(0.1)
sys.stdout.write('.')
sys.stdout.flush()
# Send kill signal to workers
self.controller.send("KILL")
#print self.p.communicate()
# Finished
self.toworker.close()
self.tosink.close()
self.sink.close()
self.controller.close()
#print 'Waiting for worker to shut down'
#self.p.join()
def wait_for_workers(self):
n = max(self.nworkers, 1)
print "Waiting for %i workers" % n
ready = 0
while ready < n:
msg = self.sink.recv_multipart()
if msg[0] == "READY":
ready += 1
def ventilator(self):
print "Sending tasks to workers..."
random.seed()
data = np.fromfunction(lambda x,y: np.cos(x)*y,(1000,2000),dtype=np.uint16)
# Start our clock now
self.tstart = time.time()
# The first message is "0" and signals start of batch
self.tosink.send(str(self.num))
self.controller.send(str(self.num))
for task_nbr in xrange(self.num):
d = data.copy()
self.write_img(d,
'g:/python_programming/test/test%i.tif'%task_nbr,normalize=-1)
del d
if task_nbr%1 == 0:
print task_nbr
# Give 0MQ time to deliver
#time.sleep(1)
print 'Finished sending'
tend = time.time()
tdiff = tend - self.tstart
total_msec = tdiff * 1000
print "Total elapsed time: %d msec" % total_msec
def write_img(self,data,fname,*args,**kwargs):
""" Save image using separate process, See Imagerw.write_img """
self.numtowrite += 1
md = dict(fname=fname, args=args, kwargs=kwargs)
self.toworker.send_json(md, zmq.SNDMORE)
send_array(self.toworker, data, copy=False)
def do_sink(self):
# Wait for start of batch
# Process 100 confirmiations
task_nbr = 0
while task_nbr < self.num:
msg = self.sink.recv_multipart()
if msg != ['.']:
# only count "I got one!" messages
continue
task_nbr += 1
self.numtowrite -= 1
if task_nbr % 10 == 0:
sys.stdout.write(":")
else:
sys.stdout.write(".")
sys.stdout.flush()
print
# Calculate and report duration of batch
tend = time.time()
tdiff = tend - self.tstart
total_msec = tdiff * 1000
print "Total elapsed time: %d msec" % total_msec
if __name__ == '__main__':
import argparse
from multiprocessing import Process
parser = argparse.ArgumentParser()
parser.add_argument("-w", type=int, default=0, help="The number of workers to start")
parser.add_argument("-n", type=int, default=1000, help="The number of messages to send")
opts = parser.parse_args()
print "starting %i workers" % opts.w
workers = [ Process(target=run_worker) for i in range(opts.w) ]
[ w.start() for w in workers ]
pw = ParallelWrite(opts.w, opts.n)
pw.wait_for_workers()
pw.ventilator()
pw.do_sink()
pw.cleanup()
[ w.join() for w in workers ]
import os
import zmq
class Worker(object):
def __init__(self):
context = zmq.Context()
# Socket to send messages on
self.fromvent = context.socket(zmq.PULL)
self.fromvent.connect("tcp://localhost:5557")
# Socket with direct access to the sink:
#used to syncronize start of batch
self.tosink = context.socket(zmq.PUSH)
self.tosink.connect("tcp://localhost:5558")
# Socket for worker control
self.tocontroller = context.socket(zmq.SUB)
self.tocontroller.connect("tcp://localhost:5559")
self.tocontroller.setsockopt(zmq.SUBSCRIBE, b'')
def log(self, msg):
print "[%i] %s" % (os.getpid(), msg)
def process(self):
self.tosink.send_multipart(["READY", str(os.getpid())])
poller = zmq.Poller()
poller.register(self.fromvent, zmq.POLLIN)
poller.register(self.tocontroller, zmq.POLLIN)
self.log("begin processing")
while True:
events = dict(poller.poll())
if self.fromvent in events:
self.process_task()
if self.tocontroller in events:
msg = self.tocontroller.recv_multipart()
self.log("control message: %s" % msg)
if msg[0] == "KILL":
break
self.log("done processing")
def process_task(self):
md = self.fromvent.recv_json(zmq.NOBLOCK)
assert self.fromvent.rcvmore, "expected more parts"
# receive into aether:
data = self.fromvent.recv_multipart(zmq.NOBLOCK)
MB = sum(len(part) for part in data) / (1024.*1024.)
self.log("writing %s (%.2f MB)" % (md['fname'], MB))
self.tosink.send('.')
def run_worker():
w = Worker()
w.process()
if __name__ == '__main__':
import argparse
from multiprocessing import Process
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, default=1, help="The number of workers to start")
n = parser.parse_args().n
print "starting %i workers" % n
workers = [ Process(target=run_worker) for i in range(n) ]
[ w.start() for w in workers ]
[ w.join() for w in workers ]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment