Skip to content

Instantly share code, notes, and snippets.

@tgarc
Last active November 28, 2016 02:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tgarc/66a2e1c32ec641006ab8c0f1b896261b to your computer and use it in GitHub Desktop.
Save tgarc/66a2e1c32ec641006ab8c0f1b896261b to your computer and use it in GitHub Desktop.
Extension of mpex.py: Keep the io process alive so that multiple file may be sent
"""
Example of keeping a simulated IO callback thread fed using separate master and daemon (I/O) processes.
Builds off of mpex.py: https://gist.github.com/tgarc/7120c00eb3eb5f736be93c70ade1d68a
"""
import multiprocessing, threading, Queue
import time
import sys
import urllib2
IOBLOCKSZ = 4096 # made up IO block size
IPCBLOCKSZ = 4096 # a size that is guaranteed to be larger than BLOCK_SZ
IPCBUFFSZ = 10*IPCBLOCKSZ # wiggle room for the inter-process buffer
QSIZE = 8 # queue size needs to vary depending on
# the ratio of IPC block size to IO block
# size
IOPERIOD = 0.01
DEBUG = 1
class Daemon(multiprocessing.Process):
def __init__(self, *args, **kwargs):
super(Daemon, self).__init__(target=self.main, args=args, **kwargs)
self._exit = multiprocessing.Event()
self._exited = multiprocessing.Event()
self.full = multiprocessing.Event()
self.ready = multiprocessing.Event()
self.daemon = True
self.count = 0
self.wridx = 0
self.rdidx = 0
def exit(self):
self._exit.set()
@property
def exited(self):
return self._exited.is_set()
def _read(self, inpbuff, blocksize):
wridx = self.wridx
# read logic is a bit more complex than the write since we don't
# necessarily know the required chunk size beforehand, so we may be
# required to read around the edges of the receive buffer
nextidx = self.rdidx+blocksize
# we assume that once the read index catches up to the write
# index the stream is finished (this is technically an overflow
# condition, but we just assume overflow will never happen)
if self.rdidx <= self.wridx < nextidx:
nextidx = self.wridx
if nextidx >= len(inpbuff): # corner case: wrap around read
nextidx %= len(inpbuff)
if self.wridx < nextidx: # we've reached the write index
nextidx = self.wridx
indata = inpbuff[self.rdidx:]
if nextidx > 0: indata += inpbuff[:nextidx]
else:
indata = inpbuff[self.rdidx:nextidx]
self.rdidx = nextidx
return indata
def _receive(self, pipe, inpbuff):
# Logic is a bit simplified here since we can guarantee buffer
# size is a multiple of the transmission size (except for the
# last chunk which will be smaller)
# no more rx buffer space; wait for processing
# Note:
# we *never* write up to the read index; technically it
# wouldn't overwrite anything but it breaks down the
# buffer checking logic if we do. Instead we simplify
# the logic and eat the cost of IPCBLOCKSZ bytes that
# will never be used
if self.wridx <= self.rdidx <= self.wridx+IPCBLOCKSZ:
return -1
self.full.clear()
nbytes = pipe.recv_bytes_into(inpbuff, self.wridx)
self.wridx = (self.wridx+nbytes) % len(inpbuff)
return nbytes
def main(self, pipe):
q = Queue.Queue(QSIZE)
inpbuff = bytearray(IPCBUFFSZ)
while not self.ready.is_set():
if self._exit.is_set(): break
time.sleep(0.050)
while not self._exit.is_set():
self.ready.clear()
# we *must* write to the inp buffer before doing anything or the
# buffer checking logic will breakdown
nbytes = pipe.recv_bytes_into(inpbuff, self.wridx)
self.wridx += nbytes
try:
self.consume(pipe, q, inpbuff)
except KeyboardInterrupt:
break
while not self.ready.is_set():
if self._exit.is_set(): break
time.sleep(0.050)
# cleanup
q.queue.clear()
self.count = self.wridx = self.rdidx = 0
self.full.clear()
pipe.close()
self._exited.set()
def consume(self, pipe, q, inpbuff):
self.lt = time.time()
dbglog = open('ioproc.log', 'w')
def process(pipe, q, inpbuff):
indata = self._read(inpbuff, IOBLOCKSZ)
try:
q.put((0, indata), block=False)
except Queue.Full:
pipe.close()
raise
if DEBUG:
print >> dbglog, self.count, 1e3*(time.time()-self.lt), self.rdidx, self.wridx, abs(self.wridx-self.rdidx), q.qsize()
self.lt = time.time()
self.count += len(indata)
if len(indata) < IOBLOCKSZ:
q.put(None)
else:
threading.Timer(0.01, process, (pipe, q, inpbuff,)).start()
threading.Timer(0.01, process, (pipe, q, inpbuff,)).start()
try:
while not self._exit.is_set():
pipe.send(q.get())
nbytes = self._receive(pipe, inpbuff)
if nbytes == -1: # buffer's full right now
continue
elif nbytes < IPCBLOCKSZ: # we must be done
while True:
item = q.get()
pipe.send(item)
if item is None: break
break
finally:
dbglog.close()
def main(argv=None):
if argv is None: argv=sys.argv[1:]
inpf, outf = argv[:2]
try:
if argv[2] == '--loop':
loop = True
except IndexError:
loop = False
if inpf == '-':
inp_fh = sys.stdin
elif inpf.startswith('http'):
inp_fh = urllib2.urlopen(inpf)
else:
inp_fh = open(inpf, 'rb')
if outf == '-':
out_fh = sys.stdout
elif outf == 'null':
out_fh = None
else:
out_fh = open(outf, 'wb+')
hostpipe, clientpipe = multiprocessing.Pipe()
w1 = Daemon(clientpipe, name='consumer')
w1.start()
clientpipe.close()
try:
while w1.is_alive():
stream(w1, hostpipe, inp_fh, out_fh)
if not loop: break
inp_fh.seek(0)
except KeyboardInterrupt:
if DEBUG: raise
except:
w1.terminate()
raise
finally:
w1.exit()
w1.join()
hostpipe.close()
if out_fh is not None: out_fh.close()
if not w1.exited:
raise multiprocessing.ProcessError("Subprocess crashed")
return 0
def stream(w1, hostpipe, inp_fh, out_fh=None):
i = 0
t1 = time.time()
statline = "{{{:^9d}}} {:6.3f}s {:10d} bytes sent, {:10d} bytes received\r"
w1.ready.set()
txcount = rxcount = 0
done = False
try:
while w1.is_alive():
if not w1.full.is_set():
outdata = inp_fh.read(IPCBLOCKSZ)
if len(outdata) == IPCBLOCKSZ:
hostpipe.send_bytes(outdata)
elif not done:
hostpipe.send_bytes(outdata)
done = True
txcount += len(outdata)
w1.full.set()
elif hostpipe.poll():
item = hostpipe.recv()
if item is None: break
stat, indata = item
if out_fh is not None:
out_fh.write(indata)
rxcount += len(indata)
if stat != 0:
raise multiprocessing.ProcessError("Subprocess error")
if out_fh != sys.stdout and (i%1000) == 0:
sys.stdout.write(statline.format(i, time.time()-t1, txcount, rxcount))
i += 1
if out_fh != sys.stdout :
sys.stdout.write(statline.format(i, time.time()-t1, txcount, rxcount))
finally:
print
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment