Skip to content

Instantly share code, notes, and snippets.

@mcchae
Created August 22, 2016 10:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mcchae/066d353685505fdb79bac2fa6574393c to your computer and use it in GitHub Desktop.
Save mcchae/066d353685505fdb79bac2fa6574393c to your computer and use it in GitHub Desktop.
APScheduler 3.2.0 example with Python 3.5
#!/usr/bin/env python3
from datetime import datetime
from time import sleep
from apscheduler.schedulers.background import BackgroundScheduler as Scheduler
##########################################################################################
class APS(object):
#=====================================================================================
def __init__(self):
self._interval_jobs = {}
self._cron_jobs = {}
self._scheduler = None
#=====================================================================================
def add_interval_job(self, job, **kwargs):
self._interval_jobs[job.__name__] = (job, kwargs)
return len(self._interval_jobs)
#=====================================================================================
def add_cron_job(self, job, **kwargs):
self._cron_jobs[job.__name__] = (job, kwargs)
return len(self._cron_jobs)
#=====================================================================================
def start_scheduler(self):
if len(self._interval_jobs) + len(self._cron_jobs) <= 0:
return False
self._scheduler = Scheduler()
for job, kwargs in self._interval_jobs.values():
self._scheduler.add_job(job, 'interval', id=job.__name__, **kwargs)
for job, kwargs in self._cron_jobs.values():
self._scheduler.add_job(job, 'cron', id=job.__name__, **kwargs)
self._scheduler.start()
return True
#=====================================================================================
def stop_scheduler(self):
if self._scheduler is not None:
self._scheduler.shutdown()
##########################################################################################
class mySchedule(APS):
#=====================================================================================
def __init__(self):
APS.__init__(self)
#=====================================================================================
def my_interval(self):
print('[%s] my_interval' % datetime.now())
#=====================================================================================
def my_cron(self):
print('[%s] my_cron' % datetime.now())
#=====================================================================================
def add_schedule(self):
self.add_interval_job(self.my_interval, seconds=3)
self.add_cron_job(self.my_cron, second='*/10')
##########################################################################################
if __name__ == '__main__':
mys = mySchedule()
mys.add_schedule()
mys.start_scheduler()
for i in range(60):
sleep(1)
print('[%s] in main sleep after start scheduler' % i)
mys.stop_scheduler()
for i in range(11):
sleep(1)
print('[%s] in main sleep after stop scheduler' % i)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment