Skip to content

Instantly share code, notes, and snippets.

@mallipeddi
Created December 3, 2009 14:13
Show Gist options
  • Save mallipeddi/248184 to your computer and use it in GitHub Desktop.
Save mallipeddi/248184 to your computer and use it in GitHub Desktop.
select() on pipes to offload work to thread-pool
"""
Simple python program demonstrating how certain blocking syscalls can be offloaded to a thread-pool and
then be able to fetch the results from these system calls in a non-blocking way by doing select() on
a pipe between the main thread and the threads in the pool.
This is the technique being used by node.js to offer a unified non-blocking Javascript API even for
things like file I/O which is traditionally done via blocking syscalls. This idea was described by
Ryan Dahl at JSConfEU 2009.
-- Harish Mallipeddi - Dec 3 2009
"""
import os
import time
import sys
import threading
import select
import itertools
class Worker(threading.Thread):
def __init__(self, workerid, pin, pout):
threading.Thread.__init__(self)
self.workerid = workerid
self.pin = pin
self.pout = pout
def log(self, msg):
#print "[Worker %d] %s" % (self.workerid, msg)
sys.stdout.flush()
def run(self):
print "[Worker] Running..."
while 1:
line = self.pin.readline()[:-1]
self.log("<= " + line)
time.sleep(5) # this sleep simulates a blocking system call like file I/O
os.write(self.pout, "+pong\n")
self.log("=> " + "+pong")
sys.stdout.flush()
def main():
pool = []
for i in range(5):
child_pin, parent_pout = os.pipe()
parent_pin, child_pout = os.pipe()
worker = Worker(i, os.fdopen(child_pin), child_pout)
worker.setDaemon(True)
worker.start()
pool.append({
'thread':worker,
'pin':os.fdopen(parent_pin),
'pout':parent_pout,
})
wlist = [worker['pout'] for worker in pool]
rlist = []
xlist = []
# event-loop
while wlist or rlist or xlist:
print "[Parent] select()-ing..."
ready_rlist, ready_wlist, ready_xlist = select.select(rlist, wlist, xlist)
print "[Parent] processing events..."
# in a real-life scenario the following will be user-supplied callbacks registered for events
for pout in ready_wlist:
os.write(pout, "+ping\n")
print "[Parent] => +ping"
wlist.remove(pout)
rlist.append(list(itertools.ifilter(lambda w: w['pout'] == pout, pool))[0]['pin'])
for pin in ready_rlist:
line = pin.readline()[:-1]
print "[Parent] <= +pong"
rlist.remove(pin)
wlist.append(list(itertools.ifilter(lambda w: w['pin'] == pin, pool))[0]['pout'])
sys.stdout.flush()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment