Skip to content

Instantly share code, notes, and snippets.

@Kiollpt
Last active September 2, 2020 09:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Kiollpt/8f77a0169dcdffd556c795b9e235d7f5 to your computer and use it in GitHub Desktop.
Save Kiollpt/8f77a0169dcdffd556c795b9e235d7f5 to your computer and use it in GitHub Desktop.
async sleep #asyncio
from threading import Thread
from threading import current_thread
from asyncio import Future
import asyncio
import time
c = (
"\033[0m", # End of color
"\033[36m", # Cyan
"\033[91m", # Red
"\033[32m", # Green
)
async def harry_sleep(sec):
future = Future()
loop = asyncio.get_running_loop()
Thread(target=blocking_sleep,args=(sec,future,loop)).start()
await future
def blocking_sleep(sec,future,loop):
time.sleep(sec)
async def sleep_future_resolver():
# resolve the future
future.set_result(None)
asyncio.run_coroutine_threadsafe(sleep_future_resolver(), loop)
print("{0} Sleeping completed in {1}".format(c[1],current_thread().getName()), flush=True)
await asyncio.sleep(1.0)
if __name__ == "__main__":
start = time.time()
tasks = asyncio.gather(harry_sleep(5),
harry_sleep(5),
harry_sleep(5),
harry_sleep(5),
harry_sleep(5),
)
loop = asyncio.get_event_loop()
loop.run_until_complete(tasks)
print("{0} main program exiting after running for {1}".format(c[2],time.time() - start))
print(c[0]+" ")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment