Skip to content

Instantly share code, notes, and snippets.

@pavelpatrin
Created July 2, 2017 22:35
Show Gist options
  • Save pavelpatrin/788889e93acf0884a241da8b44df9ab6 to your computer and use it in GitHub Desktop.
Save pavelpatrin/788889e93acf0884a241da8b44df9ab6 to your computer and use it in GitHub Desktop.
Event loop with hands
import select
import socket
import typing
from urllib.parse import urlsplit
class Task:
def __init__(self, url):
self.split = urlsplit(url)
self.socket = self.connect()
self.ready = False
self.result = b''
def connect(self) -> socket.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
try:
sock.connect((self.split.hostname, self.split.port or 80))
except BlockingIOError:
pass # This is ok.
return sock
def on_write(self):
if getattr(self, 'request_sent', False):
return
request = bytes(
'GET %s HTTP/1.1\r\n' % self.split.path +
'Host: %s\r\n' % self.split.hostname +
'Connection: close\r\n' +
'\r\n', 'utf-8'
)
self.socket.send(request)
self.request_sent = True
def on_read(self):
if not getattr(self, 'request_sent', False):
return
self.result += self.socket.recv(2 ** 20)
if self.result:
self.ready = True
class Loop:
def __init__(self, tasks: typing.Iterable[Task]):
self.fno_to_task = {task.socket.fileno(): task for task in tasks}
self.kqueue = select.kqueue()
for fno, task in self.fno_to_task.items():
flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
revent = select.kevent(fno, select.KQ_FILTER_READ, flags)
wevent = select.kevent(fno, select.KQ_FILTER_WRITE, flags)
self.kqueue.control([revent, wevent], 0)
def run(self) -> typing.List[str]:
while not all(task.ready for task in self.fno_to_task.values()):
for event in self.kqueue.control(None, 1000000, 1):
if event.filter == select.KQ_FILTER_READ:
self.fno_to_task[event.ident].on_read()
if event.filter == select.KQ_FILTER_WRITE:
self.fno_to_task[event.ident].on_write()
return [task.result for task in self.fno_to_task.values()]
url = 'http://target.my.com/api/v2/user.json'
loop = Loop([Task(url) for num in range(1000)])
results = loop.run()
for result in results:
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment