Skip to content

Instantly share code, notes, and snippets.

@chadrik
Last active March 13, 2020 03:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save chadrik/a7b7d8abce891978a7dbfd0d086828b3 to your computer and use it in GitHub Desktop.
Save chadrik/a7b7d8abce891978a7dbfd0d086828b3 to your computer and use it in GitHub Desktop.
"""
Demo of a bunch of asyncio + gevent scenarios using aiogevent.
These are not tests in the sense that there are no assertions, but pytest
is a nice way to run them in bulk or choose a specific "test"
Run like this:
python3 -m pytest test_coexist.py -s -v
"""
import random
import aiogevent
import asyncio
asyncio.set_event_loop_policy(aiogevent.EventLoopPolicy())
import gevent
import gevent.queue
def looper_sync():
print()
for i in range(10):
print("loop {}".format(i))
gevent.sleep(0.4)
async def looper_async():
print()
for i in range(10):
print("loop {}".format(i))
gevent.sleep(0.4)
def g_produce(queue, n):
print()
for x in range(1, n + 1):
# produce an item
print('GEVENT: producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
gevent.sleep(random.random())
item = str(x)
# put the item in the queue
queue.put(item)
# indicate the producer is done
queue.put(None)
def g_consume(queue):
while True:
# wait for an item from the producer
item = queue.get()
if item is None:
# the producer emits None to indicate that it is done
break
# process the item
print('GEVENT: consuming item {}...'.format(item))
# simulate i/o operation using sleep
gevent.sleep(random.random())
def run_gevent_sync():
g_queue = gevent.queue.Queue()
gevent.joinall([
gevent.spawn(g_produce, g_queue, 10),
gevent.spawn(g_consume, g_queue),
])
async def run_gevent_async():
g_queue = gevent.queue.Queue()
gevent.joinall([
gevent.spawn(g_produce, g_queue, 10),
gevent.spawn(g_consume, g_queue),
])
# ---
async def a_produce(queue, n):
print()
for x in range(1, n + 1):
# produce an item
print('AIO: producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
await queue.put(item)
# indicate the producer is done
await queue.put(None)
async def a_consume(queue):
while True:
# wait for an item from the producer
item = await queue.get()
if item is None:
# the producer emits None to indicate that it is done
break
# process the item
print('AIO: consuming item {}...'.format(item))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
def run_aio():
loop = asyncio.get_event_loop()
a_queue = asyncio.Queue(loop=loop)
producer_coro = a_produce(a_queue, 10)
consumer_coro = a_consume(a_queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
# loop.close()
def test_aio_queue_and_gevent_queue():
"""
2 sets of producers and consumers
gevent: call gevent.joinall inside a coroutine, and gather it
no work: async coroutines block while gevent greenlets run
"""
loop = asyncio.get_event_loop()
# asyncio
a_queue = asyncio.Queue(loop=loop)
producer_coro = a_produce(a_queue, 10)
consumer_coro = a_consume(a_queue)
# gevent: call gevent.joinall inside a coroutine, and gather it
gevent_coro = run_gevent_async()
loop.run_until_complete(
asyncio.gather(producer_coro, consumer_coro, gevent_coro))
# loop.close()
def test_aio_queue_and_gevent_wrapped_queue():
"""
2 sets of producers and consumers
gevent: create futures from greenlets and gather them
works
"""
loop = asyncio.get_event_loop()
# asyncio
a_queue = asyncio.Queue(loop=loop)
producer_coro = a_produce(a_queue, 10)
consumer_coro = a_consume(a_queue)
# gevent: create futures from greenlets and gather them
g_queue = gevent.queue.Queue()
producer_greenlet = gevent.spawn(g_produce, g_queue, 10)
producer_future = aiogevent.wrap_greenlet(producer_greenlet, loop)
consumer_greenlet = gevent.spawn(g_consume, g_queue)
consumer_future = aiogevent.wrap_greenlet(consumer_greenlet, loop)
# run
loop.run_until_complete(
asyncio.gather(producer_coro,
consumer_coro,
producer_future,
consumer_future))
# loop.close()
async def gather_gevent():
loop = asyncio.get_event_loop()
g_queue = gevent.queue.Queue()
producer_greenlet = gevent.spawn(g_produce, g_queue, 10)
producer_future = aiogevent.wrap_greenlet(producer_greenlet, loop)
consumer_greenlet = gevent.spawn(g_consume, g_queue)
consumer_future = aiogevent.wrap_greenlet(consumer_greenlet, loop)
await asyncio.gather(producer_future, consumer_future)
def test_aio_queue_and_gevent_nested_wrapped_queue():
"""
2 sets of producers and consumers
gevent: call coroutine that gathers futures from greenlets
works
"""
loop = asyncio.get_event_loop()
# asyncio
a_queue = asyncio.Queue(loop=loop)
producer_coro = a_produce(a_queue, 10)
consumer_coro = a_consume(a_queue)
# gevent: call coroutine that gathers futures from greenlets
gevent_coro = gather_gevent()
# run
loop.run_until_complete(
asyncio.gather(producer_coro,
consumer_coro,
gevent_coro))
# loop.close()
def test_aio_queue_and_gevent_nested_wrapped_spawned_queue():
"""
2 sets of producers and consumers
gevent: create a future from a greenlet that spawns more greenlets and
joins them
works:
this is somewhat surprising since this is so similar to
`test_aio_queue_and_gevent_queue`. this indicates that gevent functions
do not behave as expected *directly* inside a coroutine, but do when
nested anywhere below a greenlet.
"""
loop = asyncio.get_event_loop()
# asyncio
a_queue = asyncio.Queue(loop=loop)
producer_coro = a_produce(a_queue, 10)
consumer_coro = a_consume(a_queue)
# gevent: create a future from a greenlet that spawns more greenlets and
# joins them
gevent_future = aiogevent.wrap_greenlet(gevent.spawn(run_gevent_sync))
# run
loop.run_until_complete(
asyncio.gather(producer_coro,
consumer_coro,
gevent_future))
# loop.close()
def test_aio_queue_and_gevent_looper():
"""
one asyncio producer and consumer, and a loop with gevent.sleep
gevent: looper is asyncio coroutine
no work
"""
loop = asyncio.get_event_loop()
# asyncio
a_queue = asyncio.Queue(loop=loop)
producer_coro = a_produce(a_queue, 10)
consumer_coro = a_consume(a_queue)
gevent_coro = looper_async()
loop.run_until_complete(
asyncio.gather(producer_coro, consumer_coro, gevent_coro))
# loop.close()
def test_aio_queue_and_gevent_wrapped_looper():
"""
one asyncio producer and consumer, and a loop with gevent.sleep
gevent: looper is greenlet wrapped as future
works
"""
loop = asyncio.get_event_loop()
a_queue = asyncio.Queue(loop=loop)
producer_coro = a_produce(a_queue, 10)
consumer_coro = a_consume(a_queue)
looper_future = aiogevent.wrap_greenlet(gevent.spawn(looper_sync))
loop.run_until_complete(
asyncio.gather(producer_coro, consumer_coro, looper_future))
# loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment