Skip to content

Instantly share code, notes, and snippets.

@miguelgrinberg
Last active February 20, 2024 13:07
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miguelgrinberg/829a20792d7283ae27b1f6a390c378b9 to your computer and use it in GitHub Desktop.
Save miguelgrinberg/829a20792d7283ae27b1f6a390c378b9 to your computer and use it in GitHub Desktop.
Eventlet running on top of asyncio proof-of-concept
# MIT License
#
# Copyright (c) 2023 Miguel Grinberg
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio
import atexit
import functools
import os
import random
import eventlet
from eventlet.greenpool import GreenPool
from eventlet.hubs import hub, use_hub, get_hub
def is_available():
return True
class Hub(hub.BaseHub):
"""An Eventlet hub implementation on top of an asyncio event loop."""
@staticmethod
def is_available():
return True
def __init__(self):
super().__init__()
self.sleep_event = asyncio.Event()
def add_timer(self, timer):
super().add_timer(timer)
self.sleep_event.set()
def _file_cb(self, evtype, cb, fileno):
def _cb():
cb(fileno)
self.sleep_event.set()
loop = asyncio.get_event_loop()
if evtype == hub.READ:
loop.remove_reader(fileno)
else:
loop.remove_writer(fileno)
self.schedule_call_global(0, cb)
def add(self, evtype, fileno, cb, tb, mark_as_closed):
try:
os.fstat(fileno)
except OSError:
raise ValueError('Invalid file descriptor')
already_listening = self.listeners[evtype].get(fileno) is not None
listener = super().add(evtype, fileno, cb, tb, mark_as_closed)
loop = asyncio.get_event_loop()
if not already_listening:
if evtype == hub.READ:
loop.add_reader(fileno, self._file_cb, evtype, cb, fileno)
else:
loop.add_writer(fileno, self._file_cb, evtype, cb, fileno)
return listener
def remove(self, listener):
super().remove(listener)
evtype = listener.evtype
fileno = listener.fileno
if not self.listeners[evtype].get(fileno):
loop = asyncio.get_event_loop()
if evtype == hub.READ:
loop.remove_reader(fileno)
else:
loop.remove_writer(fileno)
def remove_descriptor(self, fileno):
have_read = self.listeners[hub.READ].get(fileno)
have_write = self.listeners[hub.WRITE].get(fileno)
super().remove_descriptor(fileno)
loop = asyncio.get_event_loop()
if have_read:
loop.remove_reader(fileno)
if have_write:
loop.remove_writer(fileno)
def run(self, *a, **kw):
async def async_run():
if self.running:
raise RuntimeError("Already running!")
try:
self.running = True
self.stopping = False
while not self.stopping:
while self.closed:
# We ditch all of these first.
self.close_one()
self.prepare_timers()
if self.debug_blocking:
self.block_detect_pre()
self.fire_timers(self.clock())
if self.debug_blocking:
self.block_detect_post()
self.prepare_timers()
wakeup_when = self.sleep_until()
if wakeup_when is None:
sleep_time = self.default_sleep()
else:
sleep_time = wakeup_when - self.clock()
if sleep_time > 0:
try:
await asyncio.wait_for(self.sleep_event.wait(),
sleep_time)
except asyncio.TimeoutError:
pass
self.sleep_event.clear()
else:
await asyncio.sleep(0)
else:
self.timers_canceled = 0
del self.timers[:]
del self.next_timers[:]
finally:
self.running = False
self.stopping = False
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(async_run())
def awaitable(fn):
hub = get_hub()
if not hub.running:
eventlet.sleep(0)
@functools.wraps(fn)
def decorator(*args, **kwargs):
async def coro(fn, *args, **kwargs):
future = asyncio.Future()
def greenlet():
future.set_result(fn(*args, **kwargs))
spawn(greenlet)
return await future
return coro(fn, *args, **kwargs)
return decorator
def await_(coro_or_fn):
hub = get_hub()
if not hub.running:
eventlet.sleep(0)
if asyncio.iscoroutine(coro_or_fn):
# we were given a coroutine --> await it
event = eventlet.Event()
async def run_in_aio():
event.send(await coro_or_fn)
asyncio.create_task(run_in_aio())
return event.wait()
else:
# assume decorator usage
@functools.wraps(coro_or_fn)
def decorator(*args, **kwargs):
return await_(coro_or_fn(*args, **kwargs))
return decorator
spawn = eventlet.spawn
def use():
use_hub(Hub)
def abort(wait=True):
hub = get_hub()
if hub.running:
hub.abort(wait)
atexit.register(abort)
# example greenlet
def main(name):
for i in range(10):
await_(asyncio.sleep(random.random())) # await from non-async function!
print(f'task {name}: iter {i+1}/10')
use() # set custom eventlet hub
pool = GreenPool()
for i in range(3):
pool.spawn(main, f'task #{i+1}')
pool.waitall()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment