Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
In the context of asyncio, which way is better in running the process and interacting with it.
@pytest.mark.asyncio
async def test_wait_pause_play_resume():
"""
Test that if you pause a process that and its awaitable finishes that it
completes correctly when played again.
"""
proc = utils.WaitForSignalProcess()
asyncio.ensure_future(proc.step_until_terminated())
await utils.run_until_waiting(proc)
assert proc.state == ProcessState.WAITING
result = await proc.pause()
assert result
assert proc.paused
result = proc.play()
assert result
assert not proc.paused
proc.resume()
# Wait until the process is terminated
await proc.future()
# Check it's done
assert proc.done()
assert proc.state == ProcessState.FINISHED
def test_wait_pause_play_resume_old():
"""
Test that if you pause a process that and its awaitable finishes that it
completes correctly when played again.
"""
proc = utils.WaitForSignalProcess()
loop = asyncio.get_event_loop()
loop.create_task(proc.step_until_terminated())
async def async_test():
await utils.run_until_waiting(proc)
assert proc.state == ProcessState.WAITING
result = await proc.pause()
assert result
assert proc.paused
result = proc.play()
assert result
assert not proc.paused
proc.resume()
# Wait until the process is terminated
await proc.future()
# Check it's done
assert proc.done()
assert proc.state == ProcessState.FINISHED
loop.run_until_complete(async_test())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.