Skip to content

Instantly share code, notes, and snippets.

@mrzechonek
Last active September 25, 2019 20:58
Show Gist options
  • Save mrzechonek/22975b9825d6ce0559089639358a301f to your computer and use it in GitHub Desktop.
Save mrzechonek/22975b9825d6ce0559089639358a301f to your computer and use it in GitHub Desktop.
import asyncio
import contextlib
import functools
import json
from concurrent.futures import CancelledError
from datetime import datetime
import aiohttp
import pytest
pytestmark = pytest.mark.asyncio
class ThawGun:
def __init__(self, loop):
self.loop = loop
self.offset = 0
async def advance(self, offset):
assert offset >= 0, "Can't go backwards"
real_time = self.loop.time
current_time = real_time()
new_time = current_time + offset
@functools.wraps(self.loop.time)
def thawed_time():
return real_time() + self.offset
@functools.wraps(self.loop.time)
def frozen_time():
return current_time
try:
self.loop.time = frozen_time
await asyncio.sleep(0)
while self.loop._scheduled:
handle = self.loop._scheduled[0]
if handle.when() > new_time:
break
current_time = handle.when()
handle._run()
handle._callback, handle._args = lambda: None, ()
# spin the loop so that the handle can schedule more stuff
await asyncio.sleep(0)
finally:
self.offset += offset
self.loop.time = thawed_time
@pytest.fixture
def thawgun(event_loop):
return ThawGun(event_loop)
@pytest.fixture
async def webapp(event_loop):
from webapp import app
process = await asyncio.create_subprocess_exec('flask', 'run',
loop=event_loop,
env=dict(FLASK_APP='webapp.py'))
yield process
process.kill()
await process.communicate()
class App:
def __init__(self, loop):
self.loop = loop
self.next_boom = None
self.count = 0
def boom(self):
print("Boom: {}".format(self.loop.time()))
async def run(self):
async with aiohttp.ClientSession() as session:
while True:
if self.next_boom is None:
try:
async with session.get('http://localhost:5000/schedule') as response:
schedule = json.loads(await response.text())
next = datetime.strptime(schedule['next'], '%Y-%m-%d %H:%M:%S')
delay = next - datetime.now()
self.next_boom = self.loop.call_later(delay.seconds,
self.boom)
print('Scheduled:', self.next_boom.when(), 'now:', self.loop.time())
except OSError:
pass
print("Sleeping", self.loop.time())
await asyncio.sleep(1)
@pytest.fixture
async def app(event_loop):
task = event_loop.create_task(App(event_loop).run())
yield task
task.cancel()
with contextlib.suppress(CancelledError):
await task
async def test_faketime(event_loop, thawgun, webapp, app):
await asyncio.sleep(2.0)
await thawgun.advance(20)
await asyncio.sleep(2.0)
# await asyncio.wait_for(t, timeout=1.0)
from datetime import datetime, timedelta
import json
from flask import Flask
app = Flask('WebApp')
@app.route('/schedule')
def schedule():
next = datetime.now() + timedelta(seconds=15)
return json.dumps(dict(next=next.strftime('%Y-%m-%d %H:%M:%S')))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment