Skip to content

Instantly share code, notes, and snippets.

@mrjohannchang
Last active August 29, 2015 14:21
Show Gist options
  • Save mrjohannchang/58e2565f358811a314fe to your computer and use it in GitHub Desktop.
Save mrjohannchang/58e2565f358811a314fe to your computer and use it in GitHub Desktop.
Infinite event loop
import asyncio
@asyncio.coroutine
def hello_world():
while True:
yield from asyncio.sleep(1)
print('Hello World')
@asyncio.coroutine
def good_evening():
while True:
yield from asyncio.sleep(1)
print('Good Evening')
print('step: asyncio.get_event_loop()')
loop = asyncio.get_event_loop()
try:
print('step: loop.run_until_complete()')
loop.run_until_complete(asyncio.wait([
hello_world(),
good_evening()
]))
except KeyboardInterrupt:
pass
finally:
print('step: loop.close()')
loop.close()
import asyncio
@asyncio.coroutine
def hello_world_coroutine():
yield from asyncio.sleep(1)
print('Hello World Coroutine')
asyncio.async(hello_world_coroutine())
@asyncio.coroutine
def good_evening_coroutine():
yield from asyncio.sleep(1)
print('Good Evening Coroutine')
asyncio.async(good_evening_coroutine())
print('step: asyncio.get_event_loop()')
loop = asyncio.get_event_loop()
asyncio.async(hello_world_coroutine())
asyncio.async(good_evening_coroutine())
try:
print('step: loop.run_forever()')
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print('step: loop.close()')
loop.close()
import asyncio
import time
def hello_world(loop):
print('Hello World')
loop.call_later(1, hello_world, loop)
def good_evening(loop):
print('Good Evening')
loop.call_later(1, good_evening, loop)
@asyncio.coroutine
def hello_world_coroutine():
while True:
yield from asyncio.sleep(1)
print('Hello World Coroutine')
@asyncio.coroutine
def good_evening_coroutine():
while True:
yield from asyncio.sleep(1)
print('Good Evening Coroutine')
print('step: asyncio.get_event_loop()')
loop = asyncio.get_event_loop()
print('step: loop.call_soon(hello_world, loop)')
loop.call_soon(hello_world, loop)
print('step: loop.call_soon(good_evening, loop)')
loop.call_soon(good_evening, loop)
print('step: asyncio.async(hello_world_coroutine)')
asyncio.async(hello_world_coroutine())
print('step: asyncio.async(good_evening_coroutine)')
asyncio.async(good_evening_coroutine())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print('step: loop.close()')
loop.close()
import asyncio
import time
def hello_world(loop):
print('Hello World')
loop.call_later(1, hello_world, loop)
def good_evening(loop):
print('Good Evening')
loop.call_later(1, good_evening, loop)
print('step: asyncio.get_event_loop()')
loop = asyncio.get_event_loop()
print('step: loop.call_soon(hello_world, loop)')
loop.call_soon(hello_world, loop)
print('step: loop.call_soon(good_evening, loop)')
loop.call_soon(good_evening, loop)
try:
# Blocking call interrupted by loop.stop()
print('step: loop.run_forever()')
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print('step: loop.close()')
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment