Skip to content

Instantly share code, notes, and snippets.

@HanSooloo
Created July 25, 2022 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HanSooloo/e7367e06df10ca596b4922bf3f9f9d6e to your computer and use it in GitHub Desktop.
Save HanSooloo/e7367e06df10ca596b4922bf3f9f9d6e to your computer and use it in GitHub Desktop.
Testing with apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
import apscheduler.events as events
# This file has a dictionary that maps the apscheduler event codes to actual text values
# e.g., {512: 'EVENT_JOB_ADDED'}
from apscheduler_event_types import event_code
import redis
from urllib.parse import urlparse
import time
import logging
import sys
import os
from datetime import datetime, timedelta
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s: %(levelname)8s %(module)8s - %(funcName)s: %(message)s',
)
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
r = redis.from_url(redis_url)
url = urlparse(redis_url)
REDIS_HOST = url.hostname
REDIS_PORT = url.port
jobstores = {
'default': RedisJobStore(host=REDIS_HOST, port=REDIS_PORT)
}
executors = {
'default': ThreadPoolExecutor(20)
}
def log_info(message):
logging.log(logging.INFO, message)
def log_warn(message):
logging.log(logging.WARN, message)
def gather_threads():
work_duration = 3
log_info(f'This is a long running job: {work_duration} seconds')
time.sleep(work_duration)
return f'_SOME_RESULT_'
def job_execution_listener(event):
my_name = inspect.currentframe().f_code.co_name
job = sched.get_job(event.job_id)
log_info(f'Event code: {event.code}')
if job is None:
job_name = "NO JOB NAME"
else:
job_name = job.name
if not hasattr(event, 'scheduled_run_time'):
scheduled_run_time = "NO SCHEDULED RUN TIME YET"
else:
scheduled_run_time = event.scheduled_run_time
if event.exception:
log_info(f'Event type: {event_code[event.code]}')
log_info(f'The job has crashed: {event.job_id}')
log_info(f'Relevant CRASH return info: {event.retval}')
else:
log_info(f'=================================================')
log_info(f'JOB EXECUTION EVENT')
log_info(f'Event type: {event_code[event.code]}')
log_info(f'Job ID: {event.job_id}')
log_info(f'Job Name: {job_name}')
log_info(f'Scheduled Run Time: {scheduled_run_time}')
if event.retval:
log_info(f'Relevant COMPLETION return info: {event.retval}')
log_info(f'=================================================')
def job_management_listener(event):
my_name = inspect.currentframe().f_code.co_name
log_info(f'Event code: {event.code}')
job = sched.get_job(event.job_id)
if job is None:
job_name = "NO JOB NAME"
else:
job_name = job.name
if not hasattr(event, 'scheduled_run_time'):
scheduled_run_time = "NO SCHEDULED RUN TIME YET"
else:
scheduled_run_time = event.scheduled_run_time
log_info(f'=================================================')
log_info(f'JOB MANAGEMENT EVENT')
log_info(f'Event type: {event_code[event.code]}')
log_info(f'Job ID: {event.job_id}')
log_info(f'Job Name: {job_name}')
log_info(f'Scheduled Run Time: {scheduled_run_time}')
log_info(f'=================================================')
def get_datefmt(date_time):
return date_time.strftime('%Y-%m-%d %H:%M:%S')
sched = BlockingScheduler(jobstores=jobstores, executors=executors)
log_info(f'Adding listeners')
sched.add_listener(job_execution_listener,
events.EVENT_JOB_EXECUTED |
events.EVENT_JOB_ERROR
)
sched.add_listener(job_management_listener,
events.EVENT_JOB_ADDED |
events.EVENT_JOB_MODIFIED |
events.EVENT_JOB_SUBMITTED |
events.EVENT_JOB_REMOVED
)
now = datetime.now()
job_run_start = now + timedelta(seconds=5)
job_run_stop = now + timedelta(seconds=35)
log_info(f'job_run_start: {get_datefmt(job_run_start)}')
log_info(f'job_run_stop: {get_datefmt(job_run_stop)}')
log_info(f'Scheduling jobs at: {now}')
scheduled_job = sched.add_job(gather_threads, 'date',
run_date=datetime.now() + timedelta(seconds=15)
)
# scheduled_job = sched.add_job(gather_threads, 'interval', seconds=5,
# start_date=get_datefmt(job_run_start),
# end_date=get_datefmt(job_run_stop)
# )
log_info(f'scheduled_job info:')
log_info(f'job.id: {scheduled_job.id}')
log_info(f'job.name: {scheduled_job.name}')
log_info(f'job.func: {scheduled_job.func}')
sched.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment