Skip to content

Instantly share code, notes, and snippets.

@tgarc
Last active November 18, 2016 00:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tgarc/7120c00eb3eb5f736be93c70ade1d68a to your computer and use it in GitHub Desktop.
Save tgarc/7120c00eb3eb5f736be93c70ade1d68a to your computer and use it in GitHub Desktop.
Multiprocessing example: feeding a low rate I/O stream
"""
Example of keeping a simulated IO callback thread fed using separate master and daemon (I/O) processes.
"""
import multiprocessing, threading, Queue
import time
import sys
import urllib2
IOBLOCKSZ = 4096 # made up IO block size
IPCBLOCKSZ = 8192 # a size that is guaranteed to be larger than BLOCK_SZ
IPCBUFFSZ = 10*IPCBLOCKSZ # wiggle room for the inter-process buffer
QSIZE = 1024 # queue size needs to vary depending
# on the ratio of IPC block size to IO
# block size
DEBUG = 0
class Daemon(multiprocessing.Process):
def __init__(self, *args, **kwargs):
super(Daemon, self).__init__(target=self.consume, args=args, **kwargs)
self._exit = multiprocessing.Event()
self._exited = multiprocessing.Event()
self.buffered = multiprocessing.Event()
self.daemon = True
def exit(self):
self._exit.set()
@property
def exited(self):
return self._exited.is_set()
def _read(self, inpbuff, blocksize):
with self.lock:
# logic is a bit more complex here since we don't necessarily know
# the required chunk size beforehand, so maybe required to read
# around the edges of the receive buffer
nextidx = self.rdidx+blocksize
if nextidx >= len(inpbuff): # corner case: wrap around read
nextidx %= len(inpbuff)
if self.wridx <= nextidx:
nextidx = self.wridx
indata = inpbuff[self.rdidx:]
# worst case scenario: we allocate and copy
if nextidx != 0:
indata += inpbuff[:nextidx]
else:
# we assume that once the read index catches up to the write
# index the stream is finished (however it could also indicate
# overflow)
if self.rdidx < self.wridx <= nextidx:
nextidx = self.wridx
indata = inpbuff[self.rdidx:nextidx]
self.rdidx = nextidx
return indata
def _receive(self, pipe, inpbuff):
# Logic is a bit simplified here since we can guarantee buffer
# size is a multiple of the transmission size (except for the
# last chunk which will be smaller)
with self.lock:
# no more rx buffer space; wait for processing
if self.wridx < self.rdidx <= self.wridx+IPCBLOCKSZ:
return -1
self.buffered.clear()
nbytes = pipe.recv_bytes_into(inpbuff, self.wridx)
self.wridx += nbytes
if self.wridx == len(inpbuff): self.wridx = 0
return nbytes
def consume(self, pipe):
q = Queue.Queue(QSIZE)
self.lock = threading.Lock()
inpbuff = bytearray(IPCBUFFSZ)
self.count = self.wridx = self.rdidx = 0
self.lt = time.time()
def process(self, q, inpbuff):
indata = self._read(inpbuff, IOBLOCKSZ)
self.count += len(indata)
try:
q.put((0, indata), block=False)
except Queue.Full:
pipe.close()
raise
if DEBUG:
print self.count, time.time()-self.lt, self.rdidx, self.wridx, abs(self.wridx-self.rdidx), q.qsize()
self.lt = time.time()
if len(indata) < IOBLOCKSZ:
q.put(None)
else:
threading.Timer(0.001, process, (self, q, inpbuff,)).start()
try:
self._receive(pipe, inpbuff)
threading.Timer(0.001, process, (self, q, inpbuff,)).start()
while not self._exit.is_set():
try:
item = q.get()
except Queue.Empty:
pass
else:
if item is None: break
pipe.send(item)
nbytes = self._receive(pipe, inpbuff)
if nbytes == -1: # buffer's full right now
continue
elif nbytes < IPCBLOCKSZ: # we must be done
item = q.get()
while item is not None:
pipe.send(item)
item = q.get()
break
except (KeyboardInterrupt, EOFError):
if DEBUG: raise
except multiprocessing.BufferTooShort:
raise Exception("Buffer overflow")
finally:
pipe.close()
self._exited.set()
def main(argv=None):
if argv is None: argv=sys.argv[1:]
inpf, outf = argv[:2]
try:
if argv[2] == '--loop':
loop = True
except IndexError:
loop = False
if inpf.startswith('http'):
inp_fh = urllib2.urlopen(inpf)
else:
inp_fh = open(inpf, 'rb')
if outf == '-':
out_fh = sys.stdout
elif outf == 'null':
out_fh = None
else:
out_fh = open(outf, 'wb+')
stream(inp_fh, out_fh, loop)
return 0
def stream(inp_fh, out_fh=None, loop=False):
hostpipe, clientpipe = multiprocessing.Pipe()
w1 = Daemon(clientpipe, name='consumer')
w1.start()
clientpipe.close()
try:
i = 0
t1 = time.time()
txcount = rxcount = 0
done = False
while w1.is_alive():
if not w1.buffered.is_set():
outdata = inp_fh.read(IPCBLOCKSZ)
if len(outdata) < IPCBLOCKSZ:
if loop:
inp_fh.seek(0)
outdata += inp_fh.read(IPCBLOCKSZ-len(outdata))
hostpipe.send_bytes(outdata)
elif not done:
hostpipe.send_bytes(outdata)
done = True
else:
hostpipe.send_bytes(outdata)
txcount += len(outdata)
w1.buffered.set()
elif hostpipe.poll():
try:
stat, indata = hostpipe.recv()
if out_fh is not None:
out_fh.write(indata)
rxcount += len(indata)
if stat != 0:
raise multiprocessing.ProcessError("Subprocess error")
except (IOError, EOFError): # Processing is done, or, they crashed
w1.join()
if not w1.exited:
raise multiprocessing.ProcessError("Subprocess crashed")
break
if out_fh != sys.stdout and not DEBUG:
sys.stdout.write("{{{:^6d}}} {:6.3f}s {:10d} bytes sent, {:10d} bytes received\r".format(i, time.time()-t1, txcount, rxcount))
sys.stdout.flush()
i += 1
except KeyboardInterrupt:
w1.exit()
if DEBUG: raise
except:
w1.terminate()
raise
finally:
print
hostpipe.close()
if out_fh is not None: out_fh.close()
w1.join()
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment