Skip to content

Instantly share code, notes, and snippets.

@takidog
Last active February 5, 2023 22:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save takidog/c53f73e24295d66c76b5e330940bcf73 to your computer and use it in GitHub Desktop.
Save takidog/c53f73e24295d66c76b5e330940bcf73 to your computer and use it in GitHub Desktop.
import asyncio
import threading
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from asyncio import AbstractEventLoop
async def foo(condition):
loop = asyncio.get_event_loop()
print("Foo loop id:", id(loop))
a = 1
while True:
print(f"{time.strftime('%X')}: do {a}")
a += 1
async with condition:
await condition.wait()
def non_async_func(loop: "AbstractEventLoop", condition):
print("non_async_func loop id:", id(loop))
time.sleep(1)
loop.create_task(release(condition))
async def release(condition):
loop = asyncio.get_event_loop()
print("Release loop id:", id(loop))
async with condition:
condition.notify_all()
print("notify")
async def main():
loop = asyncio.get_event_loop()
print("main loop id:", id(loop))
condition = asyncio.Condition()
job1 = loop.run_in_executor(None, non_async_func, loop, condition)
job2 = loop.create_task(foo(condition))
await asyncio.gather(job1, job2)
if __name__ == "__main__":
asyncio.run(main())
@takidog
Copy link
Author

takidog commented Feb 5, 2023

Version 2

more wrong

import asyncio
import threading
import time
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from asyncio import AbstractEventLoop

# DeprecationWarning: There is no current event loop
main_loop = asyncio.get_event_loop()


async def foo(condition):
    loop = asyncio.get_event_loop()
    print("Foo loop id:", id(loop))
    a = 1
    while True:
        print(f"{time.strftime('%X')}: do {a}")
        a += 1
        async with condition:
            await condition.wait()


def non_async_func(condition):
    print("non_async_func loop id:", id(main_loop))
    time.sleep(1)
    main_loop.create_task(release(condition))


async def release(condition):
    loop = asyncio.get_event_loop()
    print("Release loop id:", id(loop))

    async with condition:
        condition.notify_all()
    print("notify")


async def main():
    loop = asyncio.get_event_loop()
    print("main loop id:", id(loop))

    condition = asyncio.Condition()

    job1 = loop.run_in_executor(None, non_async_func, condition)
    job2 = loop.create_task(foo(condition))
    await asyncio.gather(job1, job2)


if __name__ == "__main__":
    print("main_loop", id(main_loop))
    main_loop.run_until_complete(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment