Created
January 3, 2012 08:58
-
-
Save reusee/1554159 to your computer and use it in GitHub Desktop.
a coroutine-based parallel library for Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf8 | |
from types import * | |
import thread | |
import time | |
import multiprocessing | |
import ctypes | |
import traceback | |
class ProcessRegistry: | |
active = [] | |
new = [] | |
box = {} | |
box_listener = {} | |
box_active_listener = {} | |
thread_counter = { | |
'sleep': multiprocessing.Value(ctypes.c_size_t, 0), | |
'call': multiprocessing.Value(ctypes.c_size_t, 0), | |
'fork': multiprocessing.Value(ctypes.c_size_t, 0), | |
} | |
@classmethod | |
def reset(cls): | |
cls.active = [] | |
cls.new = [] | |
cls.box = {} | |
cls.box_listener = {} | |
cls.box_active_listener = {} | |
cls.thread_counter = { | |
'sleep': multiprocessing.Value(ctypes.c_size_t, 0), | |
'call': multiprocessing.Value(ctypes.c_size_t, 0), | |
'fork': multiprocessing.Value(ctypes.c_size_t, 0), | |
} | |
@classmethod | |
def has_thread(cls): | |
for counter in cls.thread_counter: | |
if cls.thread_counter[counter].value > 0: | |
return True | |
return False | |
@classmethod | |
def inc(cls, counter): | |
cls.thread_counter[counter].acquire() | |
cls.thread_counter[counter].value += 1 | |
cls.thread_counter[counter].release() | |
@classmethod | |
def dec(cls, counter): | |
cls.thread_counter[counter].acquire() | |
cls.thread_counter[counter].value -= 1 | |
cls.thread_counter[counter].release() | |
class Process: | |
def main(self): | |
pass | |
def spawn_if_is_generator(result): | |
if isinstance(result, GeneratorType): | |
spawn(result) | |
def send(box, value = None): | |
if box in ProcessRegistry.box_listener and ProcessRegistry.box_listener[box]: | |
spawn(ProcessRegistry.box_listener[box].pop(), value) | |
return | |
if box in ProcessRegistry.box_active_listener and ProcessRegistry.box_active_listener[box]: | |
spawn(ProcessRegistry.box_active_listener[box].pop(), value) | |
if not ProcessRegistry.box_active_listener[box]: | |
del ProcessRegistry.box_active_listener[box] | |
return | |
try: | |
ProcessRegistry.box[box].insert(0, value) | |
except KeyError: | |
ProcessRegistry.box[box] = [] | |
ProcessRegistry.box[box].insert(0, value) | |
def process_yield(process, info): | |
if not info: # generator | |
spawn(process) | |
else: # async operation | |
operation = info[0] | |
if operation in ('wait', 'recv'): | |
box = info[1] | |
if box in ProcessRegistry.box and ProcessRegistry.box[box]: | |
spawn(process, ProcessRegistry.box[box].pop()) | |
else: | |
if operation == 'recv': | |
listener_list = ProcessRegistry.box_listener | |
else: | |
listener_list = ProcessRegistry.box_active_listener | |
try: | |
listener_list[box].append(process) | |
except KeyError: | |
listener_list[box] = [] | |
listener_list[box].append(process) | |
elif operation == 'sleep': | |
t = float(info[1]) | |
ProcessRegistry.inc('sleep') | |
thread.start_new_thread(_yield_sleep, (t, process)) | |
elif operation in ('call', 'fork'): | |
func = info[1] | |
args = info[2:] | |
ProcessRegistry.inc(operation) | |
if operation == 'call': | |
thread.start_new_thread(_yield_call, (process, func, args)) | |
else: | |
thread.start_new_thread(_yield_fork, (func, args)) | |
spawn(process) | |
else: | |
raise Exception('Unknown async operation: %s' % operation) | |
def _yield_sleep(t, process): | |
time.sleep(t) | |
spawn(process) | |
ProcessRegistry.dec('sleep') | |
def _yield_call(process, func, args): | |
try: | |
ret = func(*args) | |
except Exception, e: | |
send('_exception', traceback.format_exc()) | |
return | |
spawn(process, ret) | |
ProcessRegistry.dec('call') | |
def _yield_fork(func, args): | |
try: | |
func(*args) | |
except Exception, e: | |
send('_exception', traceback.format_exc()) | |
return | |
ProcessRegistry.dec('fork') | |
def child_thread_exception_handler(): | |
info = yield 'recv', '_exception' | |
print info.strip() | |
stop() | |
def run(): | |
try: | |
spawn(child_thread_exception_handler) | |
return _run() | |
except StopSignal, e: | |
ProcessRegistry.reset() | |
return e | |
def _run(): | |
while True: | |
process = None | |
try: process = ProcessRegistry.active.pop() | |
except IndexError: pass | |
while process: | |
if isinstance(process, tuple): | |
call = process[0] | |
if callable(call): | |
spawn_if_is_generator(call(*process[1])) | |
else: # generator | |
try: process_yield(call, call.send(*process[1])) | |
except StopIteration: pass | |
elif callable(process): | |
spawn_if_is_generator(process()) | |
elif isinstance(process, GeneratorType): | |
try: process_yield(process, process.next()) | |
except StopIteration: pass | |
elif isinstance(process, Process): | |
spawn_if_is_generator(getattr(process, 'main')()) | |
else: | |
raise Exception('Unknown process type: %s' % str(type(process))) | |
try: process = ProcessRegistry.active.pop() | |
except IndexError: break | |
tmp = ProcessRegistry.active | |
ProcessRegistry.active = ProcessRegistry.new | |
ProcessRegistry.new = tmp | |
if not (ProcessRegistry.active | |
or ProcessRegistry.box_active_listener | |
# 进程计数的判断,必须放在新进程判断之前 | |
# 否则可能出现的情况是,ProcessRegistry.new为[],即子进程未执行spawn | |
# 但在此条件判断之后,子进程即执行spawn,导致进程计数为0,于是就会触发if体 | |
# 也就是在if判断时new为空,但执行if体时,new不为空,这是错误的,会导致new中的进程被抛弃 | |
# 进程计数不为0时,if会马上失败;而进程计数为0时,new也肯定不为空 | |
# 因为_yield_call和_yield_sleep等子进程中,都是先spawn(使new不为空),再减少计数器 | |
# 按这个顺序可以保证new中的进程不会被抛弃 | |
# 另一个解决办法是加锁,使此if判断和spawn函数互斥,但效率有影响 | |
or ProcessRegistry.has_thread() | |
or ProcessRegistry.new | |
): | |
result = None | |
if '.result' in ProcessRegistry.box and ProcessRegistry.box['.result']: | |
result = ProcessRegistry.box['.result'].pop() | |
reason = 'normal' | |
ProcessRegistry.reset() | |
return StopSignal(result, reason) | |
class Signal: pass | |
class StopSignal(Signal): | |
def __init__(self, result = None, reason = None): | |
self.result = result | |
self.reason = reason | |
def stop(result = None, reason = None): | |
raise StopSignal(result, reason) | |
def spawn(process, *args): | |
if args: | |
ProcessRegistry.new.append((process, args)) | |
else: | |
ProcessRegistry.new.append(process) | |
def test(): | |
# factorial | |
def fac(n): | |
spawn(_fac, n, 1) | |
def _fac(n, acc): | |
if n > 0: | |
spawn(_fac, n - 1, acc * n) | |
else: | |
stop(acc) | |
n = 50 | |
spawn(fac, n) | |
print 'fac %d: %d' % (n, run().result) | |
# echo | |
def server(): | |
while True: | |
value = yield 'recv', 'ping' | |
print 'pong', value | |
def client(): | |
send('ping') | |
send('ping', 1) | |
send('ping', 2) | |
spawn(client) | |
spawn(server) | |
run() | |
# sleep | |
def foo(): | |
for i in xrange(3): | |
print 'FOO' | |
yield 'sleep', 1 | |
def bar(): | |
for i in xrange(6): | |
print 'BAR' | |
yield 'sleep', 0.5 | |
spawn(foo) | |
spawn(bar) | |
run() | |
# stop | |
def foo(): | |
print 'waitting' | |
yield 'wait', 'will never sent' | |
def bar(): | |
yield 'sleep', 1 | |
print 'stop' | |
stop() | |
spawn(foo) | |
spawn(bar) | |
run() | |
# sleep | |
span = 1 | |
n = 1000 | |
for i in xrange(n): | |
def _f(i): | |
yield 'sleep', float(span) / n * i | |
#print i | |
spawn(_f, i) | |
start_time = time.time() | |
run() | |
print time.time() - start_time | |
# message ring | |
nodes = 2000 | |
times = 200 | |
def p(n): | |
listen_to = 'box' + str(n) | |
if n == nodes: | |
send_to = 'box1' | |
else: | |
send_to = 'box' + str(n + 1) | |
for t in xrange(times): | |
msg = yield 'recv', listen_to | |
send(send_to, msg) | |
#print n, msg | |
for i in range(1, nodes + 1): | |
spawn(p, i) | |
send('box1', 'go') | |
start_time = time.time() | |
run() | |
print (nodes * times) / (time.time() - start_time), 'messages per second' | |
# async call | |
def foo(): | |
return 'Foo' | |
def baz(n): | |
return n | |
def bar(): | |
ret = yield 'call', foo | |
print ret | |
ret = yield 'call', baz, 5 | |
print ret | |
def f(): | |
for i in xrange(3): | |
print 'f' | |
yield | |
spawn(bar) | |
spawn(f) | |
run() | |
# async call exception | |
def foo(): | |
raise Exception('exception') | |
def bar(): | |
yield 'call', foo | |
spawn(bar) | |
run() | |
# fork function | |
def foo(): | |
time.sleep(1) | |
print 'foo' | |
def bar(): | |
yield 'fork', foo | |
print 'bar' | |
spawn(bar) | |
run() | |
# parallel html get | |
import requests | |
def get(index, url): | |
ret = requests.get(url) | |
send('response', (index, ret)) | |
def _multiget(urls): | |
for i in xrange(len(urls)): | |
yield 'fork', get, i, urls[i] | |
responses = [None for x in urls] | |
for i in xrange(len(urls)): | |
index, response = yield 'recv', 'response' | |
responses[index] = response | |
send('.result', responses) | |
def multiget(urls): | |
spawn(_multiget, urls) | |
return run().result | |
print multiget(['http://qq.com', 'http://baidu.com', 'http://soso.com']) | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment