Skip to content

Instantly share code, notes, and snippets.

@baybatu
Last active October 5, 2022 21:19
Show Gist options
  • Save baybatu/a04c2296b8a9cf3983a0d13689f60eb1 to your computer and use it in GitHub Desktop.
Save baybatu/a04c2296b8a9cf3983a0d13689f60eb1 to your computer and use it in GitHub Desktop.
I'm using schedule(https://github.com/dbader/schedule) library for simple scheduled job needs. However it does not have built-in asyncio support and cannot run coroutines as job. So, this scheduler example shows how to use schedule library with asyncio coroutines.
import asyncio
import functools
from schedule import Scheduler
class AsyncScheduler(object):
"""
schedule (https://github.com/dbader/schedule) and asyncio integration example.
Jobs are executed in asyncio event loop without blocking another tasks.
Example also shows how to prevent parallel job execution for long-running jobs
using built-in 'tags' mechanism in schedule library.
"""
def __init__(self):
self.scheduler = Scheduler()
async def init_scheduler(self):
print(f'Job scheduler will be started')
self.scheduler.every(3).seconds.do(self.async_job1)
self.scheduler.every(3).seconds.do(self.async_job2)
self.scheduler.every(3).seconds.do(self.async_job3)
asyncio.create_task(self._start_scheduler_loop())
print(f'Job scheduler started successfully')
async def _start_scheduler_loop(self):
while True:
runnable_jobs = (job for job in self.scheduler.get_jobs() if job.should_run)
for job in runnable_jobs:
if 'RUNNING' not in job.tags:
job.tags = {'RUNNING'}
t = asyncio.create_task(job.run())
t.add_done_callback(functools.partial(self.mark_job_as_not_running, job))
else:
print(f"Cannot run job parallel:{job}")
await asyncio.sleep(1)
def mark_job_as_not_running(self, job, task):
job.tags = {'NOT_RUNNING'}
async def async_job1(self):
print("async job 1 started")
await asyncio.sleep(3)
print("async job 1 ended")
async def async_job2(self):
print("async job 2 started")
await asyncio.sleep(1)
print("async job 2 ended")
async def async_job3(self):
print("async job 3 started")
await asyncio.sleep(1)
print("async job 3 ended")
async def main():
async_scheduler = AsyncScheduler()
await async_scheduler.init_scheduler()
while True:
# infinite application loop
await asyncio.sleep(1)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment