Created
July 25, 2022 14:19
-
-
Save HanSooloo/e7367e06df10ca596b4922bf3f9f9d6e to your computer and use it in GitHub Desktop.
Testing with apscheduler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from apscheduler.schedulers.blocking import BlockingScheduler | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from apscheduler.jobstores.redis import RedisJobStore | |
from apscheduler.executors.pool import ThreadPoolExecutor | |
import apscheduler.events as events | |
# This file has a dictionary that maps the apscheduler event codes to actual text values | |
# e.g., {512: 'EVENT_JOB_ADDED'} | |
from apscheduler_event_types import event_code | |
import redis | |
from urllib.parse import urlparse | |
import time | |
import logging | |
import sys | |
import os | |
from datetime import datetime, timedelta | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, | |
format='%(asctime)s: %(levelname)8s %(module)8s - %(funcName)s: %(message)s', | |
) | |
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379') | |
r = redis.from_url(redis_url) | |
url = urlparse(redis_url) | |
REDIS_HOST = url.hostname | |
REDIS_PORT = url.port | |
jobstores = { | |
'default': RedisJobStore(host=REDIS_HOST, port=REDIS_PORT) | |
} | |
executors = { | |
'default': ThreadPoolExecutor(20) | |
} | |
def log_info(message): | |
logging.log(logging.INFO, message) | |
def log_warn(message): | |
logging.log(logging.WARN, message) | |
def gather_threads(): | |
work_duration = 3 | |
log_info(f'This is a long running job: {work_duration} seconds') | |
time.sleep(work_duration) | |
return f'_SOME_RESULT_' | |
def job_execution_listener(event): | |
my_name = inspect.currentframe().f_code.co_name | |
job = sched.get_job(event.job_id) | |
log_info(f'Event code: {event.code}') | |
if job is None: | |
job_name = "NO JOB NAME" | |
else: | |
job_name = job.name | |
if not hasattr(event, 'scheduled_run_time'): | |
scheduled_run_time = "NO SCHEDULED RUN TIME YET" | |
else: | |
scheduled_run_time = event.scheduled_run_time | |
if event.exception: | |
log_info(f'Event type: {event_code[event.code]}') | |
log_info(f'The job has crashed: {event.job_id}') | |
log_info(f'Relevant CRASH return info: {event.retval}') | |
else: | |
log_info(f'=================================================') | |
log_info(f'JOB EXECUTION EVENT') | |
log_info(f'Event type: {event_code[event.code]}') | |
log_info(f'Job ID: {event.job_id}') | |
log_info(f'Job Name: {job_name}') | |
log_info(f'Scheduled Run Time: {scheduled_run_time}') | |
if event.retval: | |
log_info(f'Relevant COMPLETION return info: {event.retval}') | |
log_info(f'=================================================') | |
def job_management_listener(event): | |
my_name = inspect.currentframe().f_code.co_name | |
log_info(f'Event code: {event.code}') | |
job = sched.get_job(event.job_id) | |
if job is None: | |
job_name = "NO JOB NAME" | |
else: | |
job_name = job.name | |
if not hasattr(event, 'scheduled_run_time'): | |
scheduled_run_time = "NO SCHEDULED RUN TIME YET" | |
else: | |
scheduled_run_time = event.scheduled_run_time | |
log_info(f'=================================================') | |
log_info(f'JOB MANAGEMENT EVENT') | |
log_info(f'Event type: {event_code[event.code]}') | |
log_info(f'Job ID: {event.job_id}') | |
log_info(f'Job Name: {job_name}') | |
log_info(f'Scheduled Run Time: {scheduled_run_time}') | |
log_info(f'=================================================') | |
def get_datefmt(date_time): | |
return date_time.strftime('%Y-%m-%d %H:%M:%S') | |
sched = BlockingScheduler(jobstores=jobstores, executors=executors) | |
log_info(f'Adding listeners') | |
sched.add_listener(job_execution_listener, | |
events.EVENT_JOB_EXECUTED | | |
events.EVENT_JOB_ERROR | |
) | |
sched.add_listener(job_management_listener, | |
events.EVENT_JOB_ADDED | | |
events.EVENT_JOB_MODIFIED | | |
events.EVENT_JOB_SUBMITTED | | |
events.EVENT_JOB_REMOVED | |
) | |
now = datetime.now() | |
job_run_start = now + timedelta(seconds=5) | |
job_run_stop = now + timedelta(seconds=35) | |
log_info(f'job_run_start: {get_datefmt(job_run_start)}') | |
log_info(f'job_run_stop: {get_datefmt(job_run_stop)}') | |
log_info(f'Scheduling jobs at: {now}') | |
scheduled_job = sched.add_job(gather_threads, 'date', | |
run_date=datetime.now() + timedelta(seconds=15) | |
) | |
# scheduled_job = sched.add_job(gather_threads, 'interval', seconds=5, | |
# start_date=get_datefmt(job_run_start), | |
# end_date=get_datefmt(job_run_stop) | |
# ) | |
log_info(f'scheduled_job info:') | |
log_info(f'job.id: {scheduled_job.id}') | |
log_info(f'job.name: {scheduled_job.name}') | |
log_info(f'job.func: {scheduled_job.func}') | |
sched.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment