Skip to content

Instantly share code, notes, and snippets.

@kcarnold
Created April 9, 2022 15:22
Show Gist options
  • Save kcarnold/1a5c96d1f4a66e2cf4e03cea4cef22aa to your computer and use it in GitHub Desktop.
Save kcarnold/1a5c96d1f4a66e2cf4e03cea4cef22aa to your computer and use it in GitHub Desktop.
Integrating Python's sched module with asyncio
import sched
import time
import asyncio
import datetime
start_time = time.time()
def tick(a='default'):
print("From tick", time.time() - start_time, a)
#s = sched.scheduler(delayfunc=lambda time_to_sleep: time.sleep(min(1, time_to_sleep))
s = sched.scheduler()
timer1 = s.enter(3., 1, tick)
timer2 = s.enter(4., 1, tick)
cur_sleeper = None
async def run_queue():
global cur_sleeper
while True:
next_delay = s.run(blocking=False)
if next_delay is None:
break
print("Waiting for", next_delay)
cur_sleeper = asyncio.create_task(asyncio.sleep(next_delay))
try:
await cur_sleeper
except asyncio.CancelledError:
print("Cancelled wait")
continue
print("All done.")
async def requeue():
await asyncio.sleep(1.0)
s.cancel(timer1)
s.enter(1., 1, tick)
cur_sleeper.cancel()
async def main():
await asyncio.gather(
run_queue(),
requeue()
)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment