Skip to content

Instantly share code, notes, and snippets.

@sorcio
Created February 22, 2018 18:02
Show Gist options
  • Save sorcio/7c0a2d685e39acda82ae56626ac58969 to your computer and use it in GitHub Desktop.
Save sorcio/7c0a2d685e39acda82ae56626ac58969 to your computer and use it in GitHub Desktop.
@pytest.mark.trio
async def test_worker_thread_autojump(nursery, autojump_clock):
def slowish_task():
for i in range(1000000):
pass
return 'foo'
async def async_run_task(queue):
result = await trio.run_sync_in_worker_thread(slowish_task)
await queue.put(result)
queue = trio.Queue(1)
nursery.start_soon(async_run_task, queue)
with trio.fail_after(300):
assert await queue.get() == 'foo'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment