Skip to content

Instantly share code, notes, and snippets.

@roadsideseb
Last active August 11, 2022 09:49
Show Gist options
  • Save roadsideseb/f54ab060c62103ad3747712a3e45680a to your computer and use it in GitHub Desktop.
Save roadsideseb/f54ab060c62103ad3747712a3e45680a to your computer and use it in GitHub Desktop.
A slightly modified version of the 'schedule' Python package that works with asyncio 😎
import emoji
import schedule
import asyncio
import inspect
from datetime import datetime
class Job(schedule.Job):
async def run(self):
if (inspect.iscoroutine(self.job_func.func)
or inspect.iscoroutinefunction(self.job_func.func)):
ret = await self.job_func()
else:
ret = self.job_func()
class Scheduler(schedule.Scheduler):
async def run_pending(self):
runnable_jobs = (job for job in self.jobs if job.should_run)
for job in sorted(runnable_jobs):
ret = await job.run()
if isinstance(ret, schedule.CancelJob) or ret is schedule.CancelJob:
self.cancel_job(job)
def every(self, interval=1):
job = Job(interval)
self.jobs.append(job)
return job
async def show_me_emoji(emoji, count=10):
print(' '.join(emoji * count))
def print_sync(message='this is a sync function'):
print(message)
async def run_scheduler(scheduler):
while True:
await scheduler.run_pending()
await asyncio.sleep(1)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
sch = Scheduler()
sch.every(5).seconds.do(show_me_emoji, '🐍')
sch.every(5).seconds.do(print_sync)
sch.every(5).seconds.do(show_me_emoji, '🍺🍹🍷🍸', 3)
sch.every(5).seconds.do(print_sync, 'can you believe it')
sch.every(5).seconds.do(show_me_emoji, 'πŸ’₯')
loop.run_until_complete(run_scheduler(sch))
@miroslavbucek
Copy link

miroslavbucek commented Sep 6, 2017

fix for you :-)

class Job(schedule.Job):

    async def run(self):
        if (inspect.iscoroutine(self.job_func.func)
            or inspect.iscoroutinefunction(self.job_func.func)):
            ret = await self.job_func()
        else:
            ret = self.job_func()

        self.last_run = datetime.now()
        self._schedule_next_run()

@nmante
Copy link

nmante commented Feb 13, 2018

Another fix for when you're instantiating the job in the every method. This tells the Job which scheduler to register with (the custom one you've made). Also, I removed the self.jobs.append line from the every method. When you instantiate the job with the Scheduler you've just implemented, it will automatically add the job to it's list of jobs to run. Here's the line in the scheduler library that does that.

class Scheduler(schedule.Scheduler):

    async def run_pending(self):
        runnable_jobs = (job for job in self.jobs if job.should_run)

        for job in sorted(runnable_jobs):
            ret = await job.run()

            if isinstance(ret, schedule.CancelJob) or ret is schedule.CancelJob:
                self.cancel_job(job)

    def every(self, interval=1):
        job = Job(interval, self)
        return job

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment