Last active
October 5, 2022 21:19
-
-
Save baybatu/a04c2296b8a9cf3983a0d13689f60eb1 to your computer and use it in GitHub Desktop.
I'm using schedule(https://github.com/dbader/schedule) library for simple scheduled job needs. However it does not have built-in asyncio support and cannot run coroutines as job. So, this scheduler example shows how to use schedule library with asyncio coroutines.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import functools | |
from schedule import Scheduler | |
class AsyncScheduler(object): | |
""" | |
schedule (https://github.com/dbader/schedule) and asyncio integration example. | |
Jobs are executed in asyncio event loop without blocking another tasks. | |
Example also shows how to prevent parallel job execution for long-running jobs | |
using built-in 'tags' mechanism in schedule library. | |
""" | |
def __init__(self): | |
self.scheduler = Scheduler() | |
async def init_scheduler(self): | |
print(f'Job scheduler will be started') | |
self.scheduler.every(3).seconds.do(self.async_job1) | |
self.scheduler.every(3).seconds.do(self.async_job2) | |
self.scheduler.every(3).seconds.do(self.async_job3) | |
asyncio.create_task(self._start_scheduler_loop()) | |
print(f'Job scheduler started successfully') | |
async def _start_scheduler_loop(self): | |
while True: | |
runnable_jobs = (job for job in self.scheduler.get_jobs() if job.should_run) | |
for job in runnable_jobs: | |
if 'RUNNING' not in job.tags: | |
job.tags = {'RUNNING'} | |
t = asyncio.create_task(job.run()) | |
t.add_done_callback(functools.partial(self.mark_job_as_not_running, job)) | |
else: | |
print(f"Cannot run job parallel:{job}") | |
await asyncio.sleep(1) | |
def mark_job_as_not_running(self, job, task): | |
job.tags = {'NOT_RUNNING'} | |
async def async_job1(self): | |
print("async job 1 started") | |
await asyncio.sleep(3) | |
print("async job 1 ended") | |
async def async_job2(self): | |
print("async job 2 started") | |
await asyncio.sleep(1) | |
print("async job 2 ended") | |
async def async_job3(self): | |
print("async job 3 started") | |
await asyncio.sleep(1) | |
print("async job 3 ended") | |
async def main(): | |
async_scheduler = AsyncScheduler() | |
await async_scheduler.init_scheduler() | |
while True: | |
# infinite application loop | |
await asyncio.sleep(1) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment