Last active
March 10, 2016 09:47
-
-
Save yxy/997cbc73d56a19e056d4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# server.py | |
# fib microservice | |
from socket import * | |
# from threading import Thread | |
from collections import deque | |
from select import select | |
from concurrent.futures import ThreadPoolExecutor as Pool | |
from concurrent.futures import ProcessPoolExecutor as Pool | |
pool = Pool() | |
tasks = deque() | |
recv_wait = { } # Mapping sockets -> tasks (generators) | |
send_wait = { } | |
future_wait = { } | |
future_notify, future_event = socketpair() | |
def fib(n): | |
if n <= 2: | |
return 1 | |
return fib(n-1) + fib(n-2) | |
def future_done(future): | |
tasks.append(future_wait.pop(future)) | |
future_notify.send(b'x') | |
def future_monitor(): | |
while True: | |
yield 'recv', future_event | |
future_event.recv(100) | |
tasks.append(future_monitor()) | |
def run(): | |
while any([tasks, recv_wait, send_wait]): | |
done_futures = [f for f, _ in future_wait.items() if f.done()] | |
for f in done_futures: | |
t = future_wait.pop(f) | |
tasks.append((t)) | |
while not tasks: | |
# No active tasks to run | |
# wait for I/O | |
can_recv, can_send, _ = select(recv_wait, send_wait, []) | |
for s in can_recv: | |
tasks.append(recv_wait.pop(s)) | |
for s in can_send: | |
tasks.append(send_wait.pop(s)) | |
task = tasks.popleft() | |
try: | |
why, what = next(task) | |
if why == 'recv': | |
# Must go wait somewhere | |
recv_wait[what] = task | |
elif why == 'send': | |
send_wait[what] = task | |
elif why == 'future': | |
future_wait[what] = task | |
what.add_done_callback(future_done) | |
else: | |
raise RuntimeError("ARG!") | |
except StopIteration: | |
print("task done") | |
class AsyncSocket(object): | |
def __init__(self, sock): | |
self.sock = sock | |
def accept(self): | |
yield 'recv', self.sock | |
client, addr = self.sock.accept() | |
return AsyncSocket(client), addr | |
def recv(self, maxsize=100): | |
yield 'recv', self.sock | |
return self.sock.recv(maxsize) | |
def send(self, buf): | |
yield 'send', self.sock | |
self.sock.send(buf) | |
def fib_server(address): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.bind(address) | |
sock.listen(5) | |
sock = AsyncSocket(sock) | |
while True: | |
client, addr = yield from sock.accept() # blocking | |
print("Connection") | |
# not mult-core Thread(target=fib_handler, args=(client, )).start() | |
tasks.append(fib_handler(client)) | |
def fib_handler(client): | |
while True: | |
# client.send(b'what is your number: ') | |
req = yield from client.recv(100) # blocking | |
if not req: | |
break | |
n = int(req) | |
# result = fib(n) | |
future = pool.submit(fib, n) | |
yield 'future', future | |
result = future.result() # blocking | |
resp = str(result).encode("ascii") + b"\n" | |
yield from client.send(resp) | |
tasks.append(fib_server(('', 25000))) | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment