Skip to content

Instantly share code, notes, and snippets.

@pzelnip
Last active March 6, 2019 10:50
Show Gist options
  • Save pzelnip/7230b32dc9a27f6e78d9cd78b619245a to your computer and use it in GitHub Desktop.
Save pzelnip/7230b32dc9a27f6e78d9cd78b619245a to your computer and use it in GitHub Desktop.
Trying to figure out run_forever & asyncio
#Discussion from https://stackoverflow.com/a/29314606/808804
import asyncio
from threading import Thread
from concurrent.futures import Future
import functools
async def simple_coro():
await asyncio.sleep(3)
return 42
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def _add_task(self, future, coro):
task = self.loop.create_task(coro)
future.set_result(task)
def add_task(self, coro):
future = Future()
p = functools.partial(self._add_task, future, coro)
self.loop.call_soon_threadsafe(p)
return future.result() # block until result is available
def cancel(self, task):
self.loop.call_soon_threadsafe(task.cancel)
def main():
b = B()
b.start()
print(f"Before", flush=True)
result = b.add_task(simple_coro())
print(f"dslafkjdslajdsf: {result}", flush=True)
b.stop()
# this never seems to terminate?
if __name__ == "__main__":
main()
@RaD
Copy link

RaD commented Mar 6, 2019

"""
Thread 4347881120 started.
Task: <Task pending coro=<simple_coro() running at threadsafe.py:8> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10693ac48>()]>>
Thread 4347881120 stopped.
"""

import asyncio
from threading import Thread
from concurrent.futures import Future
import functools


async def simple_coro():
    await asyncio.sleep(3)
    return 42


class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

    def run(self):
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def _add_task(self, future, coro):
        task = self.loop.create_task(coro)
        future.set_result(task)

    def add_task(self, coro):
        future = Future()
        p = functools.partial(self._add_task, future, coro)
        self.loop.call_soon_threadsafe(p)
        return future.result()  # block until result is available

    def cancel(self, task):
        self.loop.call_soon_threadsafe(task.cancel)


def main():
    b = B()
    b.start()
    print(f'Thread {id(b)} started.', flush=True)
    task = b.add_task(simple_coro())
    print(f'Task: {task}', flush=True)
    b.stop()
    print(f'Thread {id(b)} stopped.', flush=True)

if __name__ == "__main__":
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment