Skip to content

Instantly share code, notes, and snippets.

@zmej-serow
Created November 4, 2019 00:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zmej-serow/9818e9b224b693d50648cd622849db57 to your computer and use it in GitHub Desktop.
Save zmej-serow/9818e9b224b693d50648cd622849db57 to your computer and use it in GitHub Desktop.
Asynchronous updating server mock
import asyncio
import random
async def fetching(q):
while True:
print("i'm fetching...")
d = random.random()
print("done fetching, got", d)
await q.put(d)
await asyncio.sleep(1)
async def answering(q):
while True:
d = await q.get()
while q.empty():
print("answering with", d)
await asyncio.sleep(0.3)
q.task_done()
async def main():
q = asyncio.Queue()
await asyncio.gather(fetching(q), answering(q))
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment