Skip to content

Instantly share code, notes, and snippets.

@pavelpatrin
Created July 2, 2017 22:34
Show Gist options
  • Save pavelpatrin/07a5d03c10d953b507814082c18c11ea to your computer and use it in GitHub Desktop.
Save pavelpatrin/07a5d03c10d953b507814082c18c11ea to your computer and use it in GitHub Desktop.
Event loop with hands
import select
import socket
from urllib.parse import urlsplit
class Loop:
ACTION_READ = 'read'
ACTION_WRITE = 'write'
ACTION_READY = 'ready'
ACTIONS_TO_FILTERS = {
ACTION_READ: select.KQ_FILTER_READ,
ACTION_WRITE: select.KQ_FILTER_WRITE,
}
FILTERS_TO_ACTIONS = {
select.KQ_FILTER_READ: ACTION_READ,
select.KQ_FILTER_WRITE: ACTION_WRITE,
}
def __init__(self, tasks):
self.tasks = tasks
self.index = {}
self.kqueue = select.kqueue()
def events(self):
while True:
events = self.kqueue.control(None, 1000000, 1)
for event in events:
yield event
def watch(self, sock, action):
filter = self.ACTIONS_TO_FILTERS[action]
flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
event = select.kevent(sock.fileno(), filter, flags)
self.kqueue.control([event], 0)
def unwatch(self, sock, action):
filter = self.ACTIONS_TO_FILTERS[action]
flags = select.KQ_EV_DELETE
event = select.kevent(sock.fileno(), filter, flags)
self.kqueue.control([event], 0)
def run(self):
pendings = set(self.tasks)
results = {}
# Init generators.
for task in self.tasks:
# Try to get next async action.
sock, action = task.send(None)
self.watch(sock, action)
# And save it to index.
key = (sock.fileno(), action)
self.index[key] = task
# Handle events by tasks.
for event in self.events():
action = self.FILTERS_TO_ACTIONS[event.filter]
key = (event.ident, action)
if key not in self.index:
continue
# Perform nonblock action.
task = self.index[key]
sock = task.send(None)
self.unwatch(sock, action)
del self.index[event.ident, action]
try:
# Try to get next async action.
sock, action = task.send(None)
self.watch(sock, action)
# And save it to index.
key = (sock.fileno(), action)
self.index[key] = task
except StopIteration as e:
# Next action was return / raise.
pendings.remove(task)
results[task] = e.value
if not pendings:
return results
def async_read(sock, response):
yield sock, Loop.ACTION_READ
response[:] = [sock.recv(2 ** 20)]
yield sock
def async_write(sock, request):
yield sock, Loop.ACTION_WRITE
size = sock.send(request)
yield sock
def task(url):
split = urlsplit(url)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
try:
sock.connect((split.hostname, split.port or 80))
except BlockingIOError:
pass # This is ok.
request = bytes(
'GET %s HTTP/1.1\r\n' % split.path +
'Host: %s\r\n' % split.hostname +
'Connection: close\r\n' +
'\r\n', 'utf-8'
)
response = []
yield from async_write(sock, request)
yield from async_read(sock, response)
return response[0]
url = 'http://target.my.com/api/v2/user.json'
loop = Loop([task(url) for num in range(1000)])
results = loop.run()
for result in results.values():
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment