Created
August 22, 2016 10:40
-
-
Save mcchae/066d353685505fdb79bac2fa6574393c to your computer and use it in GitHub Desktop.
APScheduler 3.2.0 example with Python 3.5
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from datetime import datetime | |
from time import sleep | |
from apscheduler.schedulers.background import BackgroundScheduler as Scheduler | |
########################################################################################## | |
class APS(object): | |
#===================================================================================== | |
def __init__(self): | |
self._interval_jobs = {} | |
self._cron_jobs = {} | |
self._scheduler = None | |
#===================================================================================== | |
def add_interval_job(self, job, **kwargs): | |
self._interval_jobs[job.__name__] = (job, kwargs) | |
return len(self._interval_jobs) | |
#===================================================================================== | |
def add_cron_job(self, job, **kwargs): | |
self._cron_jobs[job.__name__] = (job, kwargs) | |
return len(self._cron_jobs) | |
#===================================================================================== | |
def start_scheduler(self): | |
if len(self._interval_jobs) + len(self._cron_jobs) <= 0: | |
return False | |
self._scheduler = Scheduler() | |
for job, kwargs in self._interval_jobs.values(): | |
self._scheduler.add_job(job, 'interval', id=job.__name__, **kwargs) | |
for job, kwargs in self._cron_jobs.values(): | |
self._scheduler.add_job(job, 'cron', id=job.__name__, **kwargs) | |
self._scheduler.start() | |
return True | |
#===================================================================================== | |
def stop_scheduler(self): | |
if self._scheduler is not None: | |
self._scheduler.shutdown() | |
########################################################################################## | |
class mySchedule(APS): | |
#===================================================================================== | |
def __init__(self): | |
APS.__init__(self) | |
#===================================================================================== | |
def my_interval(self): | |
print('[%s] my_interval' % datetime.now()) | |
#===================================================================================== | |
def my_cron(self): | |
print('[%s] my_cron' % datetime.now()) | |
#===================================================================================== | |
def add_schedule(self): | |
self.add_interval_job(self.my_interval, seconds=3) | |
self.add_cron_job(self.my_cron, second='*/10') | |
########################################################################################## | |
if __name__ == '__main__': | |
mys = mySchedule() | |
mys.add_schedule() | |
mys.start_scheduler() | |
for i in range(60): | |
sleep(1) | |
print('[%s] in main sleep after start scheduler' % i) | |
mys.stop_scheduler() | |
for i in range(11): | |
sleep(1) | |
print('[%s] in main sleep after stop scheduler' % i) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment