Skip to content

Instantly share code, notes, and snippets.

@neriberto
Last active November 27, 2020 20:42
Show Gist options
  • Save neriberto/0415a4ae50d5d8a09ede7dc273121354 to your computer and use it in GitHub Desktop.
Save neriberto/0415a4ae50d5d8a09ede7dc273121354 to your computer and use it in GitHub Desktop.
testing apscheduler
import logging
import os
import time
import pytz
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def tick1():
print('Tick 1! The time is: %s' % datetime.now())
time.sleep(6)
def tick2():
print('Tick 2! The time is: %s' % datetime.now())
def main():
# logging.basicConfig()
# logging.getLogger('apscheduler').setLevel(logging.DEBUG)
jobstores = {
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(max_workers=1)
}
job_defaults = {
'coalesce': False,
'max_instances': 1
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=pytz.utc
)
# Aqui estou aceitando o job tick1 sobreEscrever o max_instances
scheduler.add_job(tick1, 'interval', seconds=3, max_instances=2)
scheduler.add_job(tick2, 'interval', seconds=3)
# Com base na quantidade de jobs reconfiguro a ThreadPool
max_workers = len(scheduler.get_jobs())
executors = {
'default': ThreadPoolExecutor(max_workers=max_workers)
}
scheduler.configure()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
while True:
time.sleep(3)
except (KeyboardInterrupt, SystemExit):
pass
if __name__=="__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment