Skip to content

Instantly share code, notes, and snippets.

@unkcpz
Created May 7, 2020 08:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save unkcpz/c2221120d8b7dca748325ce1391d4b57 to your computer and use it in GitHub Desktop.
Save unkcpz/c2221120d8b7dca748325ce1391d4b57 to your computer and use it in GitHub Desktop.
In the context of asyncio, which way is better in running the process and interacting with it.
@pytest.mark.asyncio
async def test_wait_pause_play_resume():
"""
Test that if you pause a process that and its awaitable finishes that it
completes correctly when played again.
"""
proc = utils.WaitForSignalProcess()
asyncio.ensure_future(proc.step_until_terminated())
await utils.run_until_waiting(proc)
assert proc.state == ProcessState.WAITING
result = await proc.pause()
assert result
assert proc.paused
result = proc.play()
assert result
assert not proc.paused
proc.resume()
# Wait until the process is terminated
await proc.future()
# Check it's done
assert proc.done()
assert proc.state == ProcessState.FINISHED
def test_wait_pause_play_resume_old():
"""
Test that if you pause a process that and its awaitable finishes that it
completes correctly when played again.
"""
proc = utils.WaitForSignalProcess()
loop = asyncio.get_event_loop()
loop.create_task(proc.step_until_terminated())
async def async_test():
await utils.run_until_waiting(proc)
assert proc.state == ProcessState.WAITING
result = await proc.pause()
assert result
assert proc.paused
result = proc.play()
assert result
assert not proc.paused
proc.resume()
# Wait until the process is terminated
await proc.future()
# Check it's done
assert proc.done()
assert proc.state == ProcessState.FINISHED
loop.run_until_complete(async_test())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment