Skip to content

Instantly share code, notes, and snippets.

@reusee
Created January 3, 2012 08:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reusee/1554159 to your computer and use it in GitHub Desktop.
Save reusee/1554159 to your computer and use it in GitHub Desktop.
a coroutine-based parallel library for Python
# coding: utf8
from types import *
import thread
import time
import multiprocessing
import ctypes
import traceback
class ProcessRegistry:
active = []
new = []
box = {}
box_listener = {}
box_active_listener = {}
thread_counter = {
'sleep': multiprocessing.Value(ctypes.c_size_t, 0),
'call': multiprocessing.Value(ctypes.c_size_t, 0),
'fork': multiprocessing.Value(ctypes.c_size_t, 0),
}
@classmethod
def reset(cls):
cls.active = []
cls.new = []
cls.box = {}
cls.box_listener = {}
cls.box_active_listener = {}
cls.thread_counter = {
'sleep': multiprocessing.Value(ctypes.c_size_t, 0),
'call': multiprocessing.Value(ctypes.c_size_t, 0),
'fork': multiprocessing.Value(ctypes.c_size_t, 0),
}
@classmethod
def has_thread(cls):
for counter in cls.thread_counter:
if cls.thread_counter[counter].value > 0:
return True
return False
@classmethod
def inc(cls, counter):
cls.thread_counter[counter].acquire()
cls.thread_counter[counter].value += 1
cls.thread_counter[counter].release()
@classmethod
def dec(cls, counter):
cls.thread_counter[counter].acquire()
cls.thread_counter[counter].value -= 1
cls.thread_counter[counter].release()
class Process:
def main(self):
pass
def spawn_if_is_generator(result):
if isinstance(result, GeneratorType):
spawn(result)
def send(box, value = None):
if box in ProcessRegistry.box_listener and ProcessRegistry.box_listener[box]:
spawn(ProcessRegistry.box_listener[box].pop(), value)
return
if box in ProcessRegistry.box_active_listener and ProcessRegistry.box_active_listener[box]:
spawn(ProcessRegistry.box_active_listener[box].pop(), value)
if not ProcessRegistry.box_active_listener[box]:
del ProcessRegistry.box_active_listener[box]
return
try:
ProcessRegistry.box[box].insert(0, value)
except KeyError:
ProcessRegistry.box[box] = []
ProcessRegistry.box[box].insert(0, value)
def process_yield(process, info):
if not info: # generator
spawn(process)
else: # async operation
operation = info[0]
if operation in ('wait', 'recv'):
box = info[1]
if box in ProcessRegistry.box and ProcessRegistry.box[box]:
spawn(process, ProcessRegistry.box[box].pop())
else:
if operation == 'recv':
listener_list = ProcessRegistry.box_listener
else:
listener_list = ProcessRegistry.box_active_listener
try:
listener_list[box].append(process)
except KeyError:
listener_list[box] = []
listener_list[box].append(process)
elif operation == 'sleep':
t = float(info[1])
ProcessRegistry.inc('sleep')
thread.start_new_thread(_yield_sleep, (t, process))
elif operation in ('call', 'fork'):
func = info[1]
args = info[2:]
ProcessRegistry.inc(operation)
if operation == 'call':
thread.start_new_thread(_yield_call, (process, func, args))
else:
thread.start_new_thread(_yield_fork, (func, args))
spawn(process)
else:
raise Exception('Unknown async operation: %s' % operation)
def _yield_sleep(t, process):
time.sleep(t)
spawn(process)
ProcessRegistry.dec('sleep')
def _yield_call(process, func, args):
try:
ret = func(*args)
except Exception, e:
send('_exception', traceback.format_exc())
return
spawn(process, ret)
ProcessRegistry.dec('call')
def _yield_fork(func, args):
try:
func(*args)
except Exception, e:
send('_exception', traceback.format_exc())
return
ProcessRegistry.dec('fork')
def child_thread_exception_handler():
info = yield 'recv', '_exception'
print info.strip()
stop()
def run():
try:
spawn(child_thread_exception_handler)
return _run()
except StopSignal, e:
ProcessRegistry.reset()
return e
def _run():
while True:
process = None
try: process = ProcessRegistry.active.pop()
except IndexError: pass
while process:
if isinstance(process, tuple):
call = process[0]
if callable(call):
spawn_if_is_generator(call(*process[1]))
else: # generator
try: process_yield(call, call.send(*process[1]))
except StopIteration: pass
elif callable(process):
spawn_if_is_generator(process())
elif isinstance(process, GeneratorType):
try: process_yield(process, process.next())
except StopIteration: pass
elif isinstance(process, Process):
spawn_if_is_generator(getattr(process, 'main')())
else:
raise Exception('Unknown process type: %s' % str(type(process)))
try: process = ProcessRegistry.active.pop()
except IndexError: break
tmp = ProcessRegistry.active
ProcessRegistry.active = ProcessRegistry.new
ProcessRegistry.new = tmp
if not (ProcessRegistry.active
or ProcessRegistry.box_active_listener
# 进程计数的判断,必须放在新进程判断之前
# 否则可能出现的情况是,ProcessRegistry.new为[],即子进程未执行spawn
# 但在此条件判断之后,子进程即执行spawn,导致进程计数为0,于是就会触发if体
# 也就是在if判断时new为空,但执行if体时,new不为空,这是错误的,会导致new中的进程被抛弃
# 进程计数不为0时,if会马上失败;而进程计数为0时,new也肯定不为空
# 因为_yield_call和_yield_sleep等子进程中,都是先spawn(使new不为空),再减少计数器
# 按这个顺序可以保证new中的进程不会被抛弃
# 另一个解决办法是加锁,使此if判断和spawn函数互斥,但效率有影响
or ProcessRegistry.has_thread()
or ProcessRegistry.new
):
result = None
if '.result' in ProcessRegistry.box and ProcessRegistry.box['.result']:
result = ProcessRegistry.box['.result'].pop()
reason = 'normal'
ProcessRegistry.reset()
return StopSignal(result, reason)
class Signal: pass
class StopSignal(Signal):
def __init__(self, result = None, reason = None):
self.result = result
self.reason = reason
def stop(result = None, reason = None):
raise StopSignal(result, reason)
def spawn(process, *args):
if args:
ProcessRegistry.new.append((process, args))
else:
ProcessRegistry.new.append(process)
def test():
# factorial
def fac(n):
spawn(_fac, n, 1)
def _fac(n, acc):
if n > 0:
spawn(_fac, n - 1, acc * n)
else:
stop(acc)
n = 50
spawn(fac, n)
print 'fac %d: %d' % (n, run().result)
# echo
def server():
while True:
value = yield 'recv', 'ping'
print 'pong', value
def client():
send('ping')
send('ping', 1)
send('ping', 2)
spawn(client)
spawn(server)
run()
# sleep
def foo():
for i in xrange(3):
print 'FOO'
yield 'sleep', 1
def bar():
for i in xrange(6):
print 'BAR'
yield 'sleep', 0.5
spawn(foo)
spawn(bar)
run()
# stop
def foo():
print 'waitting'
yield 'wait', 'will never sent'
def bar():
yield 'sleep', 1
print 'stop'
stop()
spawn(foo)
spawn(bar)
run()
# sleep
span = 1
n = 1000
for i in xrange(n):
def _f(i):
yield 'sleep', float(span) / n * i
#print i
spawn(_f, i)
start_time = time.time()
run()
print time.time() - start_time
# message ring
nodes = 2000
times = 200
def p(n):
listen_to = 'box' + str(n)
if n == nodes:
send_to = 'box1'
else:
send_to = 'box' + str(n + 1)
for t in xrange(times):
msg = yield 'recv', listen_to
send(send_to, msg)
#print n, msg
for i in range(1, nodes + 1):
spawn(p, i)
send('box1', 'go')
start_time = time.time()
run()
print (nodes * times) / (time.time() - start_time), 'messages per second'
# async call
def foo():
return 'Foo'
def baz(n):
return n
def bar():
ret = yield 'call', foo
print ret
ret = yield 'call', baz, 5
print ret
def f():
for i in xrange(3):
print 'f'
yield
spawn(bar)
spawn(f)
run()
# async call exception
def foo():
raise Exception('exception')
def bar():
yield 'call', foo
spawn(bar)
run()
# fork function
def foo():
time.sleep(1)
print 'foo'
def bar():
yield 'fork', foo
print 'bar'
spawn(bar)
run()
# parallel html get
import requests
def get(index, url):
ret = requests.get(url)
send('response', (index, ret))
def _multiget(urls):
for i in xrange(len(urls)):
yield 'fork', get, i, urls[i]
responses = [None for x in urls]
for i in xrange(len(urls)):
index, response = yield 'recv', 'response'
responses[index] = response
send('.result', responses)
def multiget(urls):
spawn(_multiget, urls)
return run().result
print multiget(['http://qq.com', 'http://baidu.com', 'http://soso.com'])
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment