Last active
September 25, 2019 20:58
-
-
Save mrzechonek/22975b9825d6ce0559089639358a301f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import contextlib | |
import functools | |
import json | |
from concurrent.futures import CancelledError | |
from datetime import datetime | |
import aiohttp | |
import pytest | |
pytestmark = pytest.mark.asyncio | |
class ThawGun: | |
def __init__(self, loop): | |
self.loop = loop | |
self.offset = 0 | |
async def advance(self, offset): | |
assert offset >= 0, "Can't go backwards" | |
real_time = self.loop.time | |
current_time = real_time() | |
new_time = current_time + offset | |
@functools.wraps(self.loop.time) | |
def thawed_time(): | |
return real_time() + self.offset | |
@functools.wraps(self.loop.time) | |
def frozen_time(): | |
return current_time | |
try: | |
self.loop.time = frozen_time | |
await asyncio.sleep(0) | |
while self.loop._scheduled: | |
handle = self.loop._scheduled[0] | |
if handle.when() > new_time: | |
break | |
current_time = handle.when() | |
handle._run() | |
handle._callback, handle._args = lambda: None, () | |
# spin the loop so that the handle can schedule more stuff | |
await asyncio.sleep(0) | |
finally: | |
self.offset += offset | |
self.loop.time = thawed_time | |
@pytest.fixture | |
def thawgun(event_loop): | |
return ThawGun(event_loop) | |
@pytest.fixture | |
async def webapp(event_loop): | |
from webapp import app | |
process = await asyncio.create_subprocess_exec('flask', 'run', | |
loop=event_loop, | |
env=dict(FLASK_APP='webapp.py')) | |
yield process | |
process.kill() | |
await process.communicate() | |
class App: | |
def __init__(self, loop): | |
self.loop = loop | |
self.next_boom = None | |
self.count = 0 | |
def boom(self): | |
print("Boom: {}".format(self.loop.time())) | |
async def run(self): | |
async with aiohttp.ClientSession() as session: | |
while True: | |
if self.next_boom is None: | |
try: | |
async with session.get('http://localhost:5000/schedule') as response: | |
schedule = json.loads(await response.text()) | |
next = datetime.strptime(schedule['next'], '%Y-%m-%d %H:%M:%S') | |
delay = next - datetime.now() | |
self.next_boom = self.loop.call_later(delay.seconds, | |
self.boom) | |
print('Scheduled:', self.next_boom.when(), 'now:', self.loop.time()) | |
except OSError: | |
pass | |
print("Sleeping", self.loop.time()) | |
await asyncio.sleep(1) | |
@pytest.fixture | |
async def app(event_loop): | |
task = event_loop.create_task(App(event_loop).run()) | |
yield task | |
task.cancel() | |
with contextlib.suppress(CancelledError): | |
await task | |
async def test_faketime(event_loop, thawgun, webapp, app): | |
await asyncio.sleep(2.0) | |
await thawgun.advance(20) | |
await asyncio.sleep(2.0) | |
# await asyncio.wait_for(t, timeout=1.0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
import json | |
from flask import Flask | |
app = Flask('WebApp') | |
@app.route('/schedule') | |
def schedule(): | |
next = datetime.now() + timedelta(seconds=15) | |
return json.dumps(dict(next=next.strftime('%Y-%m-%d %H:%M:%S'))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment