Skip to content

Instantly share code, notes, and snippets.

@FND
Last active December 9, 2016 07:47
Show Gist options
  • Save FND/15491dd4b4b7e7933715 to your computer and use it in GitHub Desktop.
Save FND/15491dd4b4b7e7933715 to your computer and use it in GitHub Desktop.
Python asyncio
import time
import random
URIS = ("http://example.org/foo", "http://example.org/bar",
"http://example.org/baz")
def main():
for uri in URIS:
res = fetch(uri)
print("---- %s (%s)" % (res, uri))
def fetch(uri):
print("GET %s" % uri)
response = _http()
print("%s kB" % response)
return response
def _http():
time.sleep(1.0)
return random.randint(0, 9)
main()
import asyncio
import random
URIS = ("http://example.org/foo", "http://example.org/bar",
"http://example.org/baz")
@asyncio.coroutine
def main():
for uri in URIS:
res = yield from fetch(uri)
print("---- %s (%s)" % (res, uri))
@asyncio.coroutine
def fetch(uri):
print("GET %s" % uri)
response = yield from _http()
print("%s kB" % response)
return response
@asyncio.coroutine
def _http():
yield from asyncio.sleep(1.0)
return random.randint(0, 9)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
import asyncio
import random
URIS = ("http://example.org/foo", "http://example.org/bar",
"http://example.org/baz")
def main():
for uri in URIS:
task = asyncio.Task(fetch(uri))
print("---- registered %s" % task)
@asyncio.coroutine
def fetch(uri):
print("retrieving %s..." % uri)
response = yield from _http()
print("downloaded %s kB" % response)
return response
@asyncio.coroutine
def _http():
yield from asyncio.sleep(1.0)
return random.randint(0, 9)
main()
loop = asyncio.get_event_loop()
try:
loop.run_forever()
finally:
loop.close()
import asyncio
import random
URIS = ("http://example.org/foo", "http://example.org/bar",
"http://example.org/baz")
def main(loop):
req_count = 0
def shutdown():
if req_count == len(URIS):
loop.stop()
for uri in URIS:
task = asyncio.Task(fetch(uri))
task.add_done_callback(lambda task: shutdown())
req_count += 1
print("---- registered #%s %s" % (req_count, task))
@asyncio.coroutine
def fetch(uri):
print("retrieving %s..." % uri)
response = yield from _http()
print("downloaded %s kB" % response)
return response
@asyncio.coroutine
def _http():
yield from asyncio.sleep(1.0)
return random.randint(0, 9)
loop = asyncio.get_event_loop()
main(loop)
try:
loop.run_forever()
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment