Created
July 2, 2017 22:35
-
-
Save pavelpatrin/788889e93acf0884a241da8b44df9ab6 to your computer and use it in GitHub Desktop.
Event loop with hands
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import select | |
import socket | |
import typing | |
from urllib.parse import urlsplit | |
class Task: | |
def __init__(self, url): | |
self.split = urlsplit(url) | |
self.socket = self.connect() | |
self.ready = False | |
self.result = b'' | |
def connect(self) -> socket.socket: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setblocking(0) | |
try: | |
sock.connect((self.split.hostname, self.split.port or 80)) | |
except BlockingIOError: | |
pass # This is ok. | |
return sock | |
def on_write(self): | |
if getattr(self, 'request_sent', False): | |
return | |
request = bytes( | |
'GET %s HTTP/1.1\r\n' % self.split.path + | |
'Host: %s\r\n' % self.split.hostname + | |
'Connection: close\r\n' + | |
'\r\n', 'utf-8' | |
) | |
self.socket.send(request) | |
self.request_sent = True | |
def on_read(self): | |
if not getattr(self, 'request_sent', False): | |
return | |
self.result += self.socket.recv(2 ** 20) | |
if self.result: | |
self.ready = True | |
class Loop: | |
def __init__(self, tasks: typing.Iterable[Task]): | |
self.fno_to_task = {task.socket.fileno(): task for task in tasks} | |
self.kqueue = select.kqueue() | |
for fno, task in self.fno_to_task.items(): | |
flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE | |
revent = select.kevent(fno, select.KQ_FILTER_READ, flags) | |
wevent = select.kevent(fno, select.KQ_FILTER_WRITE, flags) | |
self.kqueue.control([revent, wevent], 0) | |
def run(self) -> typing.List[str]: | |
while not all(task.ready for task in self.fno_to_task.values()): | |
for event in self.kqueue.control(None, 1000000, 1): | |
if event.filter == select.KQ_FILTER_READ: | |
self.fno_to_task[event.ident].on_read() | |
if event.filter == select.KQ_FILTER_WRITE: | |
self.fno_to_task[event.ident].on_write() | |
return [task.result for task in self.fno_to_task.values()] | |
url = 'http://target.my.com/api/v2/user.json' | |
loop = Loop([Task(url) for num in range(1000)]) | |
results = loop.run() | |
for result in results: | |
print(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment