Skip to content

Instantly share code, notes, and snippets.

@HanSooloo
Created July 27, 2022 08:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HanSooloo/371c1b2d4ab4b065bb66aba4f6902c59 to your computer and use it in GitHub Desktop.
Save HanSooloo/371c1b2d4ab4b065bb66aba4f6902c59 to your computer and use it in GitHub Desktop.
apscheduler test program
import random
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
import apscheduler.events as events
from apscheduler_event_types import event_code, EventCode
import redis
from urllib.parse import urlparse
import time
import logging
import sys
import os
from datetime import datetime, timedelta
import inspect
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s: %(levelname)8s %(module)8s - %(funcName)s: %(message)s',
)
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
def log_info(message):
logging.log(logging.INFO, message)
def log_warn(message):
logging.log(logging.WARN, message)
def log_debug(message):
logging.log(logging.DEBUG, message)
# Set the following env variables for proper operation:
# UPSTASH_REDIS: full URL including password, host, and port
# REDIS_SSL_CERT_VERIFY: `required` for UPSTASH, `none` for local
redis_url = os.getenv('UPSTASH_REDIS', 'redis://localhost:6379')
redis_ssl_verify = os.getenv('REDIS_SSL_CERT_VERIFY', 'required')
r = redis.from_url(redis_url, ssl_cert_reqs=redis_ssl_verify)
url = urlparse(redis_url)
REDIS_HOST = url.hostname
REDIS_PORT = url.port
REDIS_PASS = url.password
REDIS_PROTO = url.scheme
log_debug(f'REDIS URL: {redis_url}')
log_debug(f'redis_ssl_verify: {redis_ssl_verify}')
def job_delay_wrapper(func):
"""
This fixes an issue with APScheduler: https://github.com/agronholm/apscheduler/issues/445
"""
def delayed_job(*args, **kwargs):
sleep_duration = 0.1
log_debug(f'Sleeping for {sleep_duration} seconds ...')
time.sleep(sleep_duration)
func(*args, **kwargs)
return delayed_job
def find_race_round_number():
round_number = random.randint(1000, 9999)
return round_number
@job_delay_wrapper
def gather_threads():
my_name = inspect.currentframe().f_code.co_name
log_info(f'Doing work in ... {my_name}')
# work_duration = 3
# log_info(f'This is a long running job: {work_duration} seconds')
log_info(f'Recording job info.')
# r.hset('f1_bot:2022:11:anouncements:rawe_ceek', '99-99-99', datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"))
round_number = find_race_round_number()
log_info(f'Find race week Round number ... {round_number}')
r.hset(f'f1_bot:2022:{round_number}:announcements:rawe_ceek', '99-99-99', datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))
# time.sleep(work_duration)
return f'_SOME_RESULT_'
def job_execution_listener(event):
log_info(f'Event code: {event.code}, type: {event_code[event.code]}')
job = sched.get_job(event.job_id)
if job is None:
job_name = "NO JOB NAME"
else:
job_name = job.name
if not hasattr(event, 'scheduled_run_time'):
scheduled_run_time = "NO SCHEDULED RUN TIME YET"
else:
scheduled_run_time = event.scheduled_run_time
if event.exception:
log_debug(f'Event type: {event_code[event.code]}')
log_debug(f'The job has crashed: {event.job_id}')
log_debug(f'Relevant CRASH return info: {event.retval}')
else:
log_debug(f'=================================================')
log_debug(f'JOB EXECUTION EVENT')
log_debug(f'Event type: {event_code[event.code]}')
log_debug(f'Job ID: {event.job_id}')
log_debug(f'Job Name: {job_name}')
log_debug(f'Scheduled Run Time: {scheduled_run_time}')
if event.retval:
log_debug(f'Relevant COMPLETION return info: {event.retval}')
log_debug(f'=================================================')
def job_management_listener(event):
log_info(f'Event code: {event.code}, type: {event_code[event.code]}')
job = sched.get_job(event.job_id)
if job is None:
job_name = "NO JOB NAME"
else:
job_name = job.name
if not hasattr(event, 'scheduled_run_time'):
scheduled_run_time = "NO SCHEDULED RUN TIME YET"
else:
scheduled_run_time = event.scheduled_run_time
log_debug(f'=================================================')
log_debug(f'JOB MANAGEMENT EVENT')
log_debug(f'Event type: {event_code[event.code]}')
log_debug(f'Job ID: {event.job_id}')
log_debug(f'Job Name: {job_name}')
log_debug(f'Scheduled Run Time: {scheduled_run_time}')
log_debug(f'=================================================')
def get_datefmt(date_time):
return date_time.strftime('%Y-%m-%d %H:%M:%S')
if REDIS_PROTO == 'rediss':
ssl = True
else:
ssl = False
jobstores = {
'default': RedisJobStore(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASS,
ssl_cert_reqs=redis_ssl_verify,
ssl=ssl,
jobs_key='apscheduler.jobs',
run_times_key='apscheduler.run_times')
}
executors = {
'default': ThreadPoolExecutor(20)
}
scheduler_timezone = 'Asia/Hong_Kong'
# scheduler_timezone = 'UTC'
sched = BlockingScheduler(jobstores=jobstores, executors=executors, timezone=scheduler_timezone)
# background_scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors)
# sched = background_scheduler
def flags_to_text(n):
flags = ', '.join(
[flag.name for flag in EventCode if n & flag.value]
)
return flags
now = datetime.now()
job_run_start = now + timedelta(seconds=5)
# job_run_stop = now + timedelta(seconds=35)
# log_info(f'job_run_start: {get_datefmt(job_run_start)}')
# log_info(f'job_run_stop: {get_datefmt(job_run_stop)}')
if __name__ == '__main__':
log_info(f'Starting __main__ program loop.')
log_info(f'Adding listeners')
execution_flags = events.EVENT_JOB_EXECUTED | \
events.EVENT_JOB_ERROR
log_debug(f'Execution Listener flag names: {flags_to_text(execution_flags)}')
sched.add_listener(job_execution_listener,
execution_flags
)
management_flags = events.EVENT_JOB_ADDED | \
events.EVENT_JOB_MODIFIED | \
events.EVENT_JOB_SUBMITTED | \
events.EVENT_JOB_REMOVED
log_debug(f'Management Listener flag names: {flags_to_text(management_flags)}')
sched.add_listener(job_management_listener,
management_flags
)
log_info(f'Adding scheduled jobs')
job_function = f'{os.path.splitext(os.path.basename(__file__))[0]}:gather_threads'
# scheduled_job = sched.add_job(func=job_function,
# trigger='cron',
# second='*/15',
# start_date=get_datefmt(job_run_start),
# name=job_function
# )
scheduled_job = sched.add_job(func=job_function,
trigger='date',
run_date=datetime.now() + timedelta(seconds=15),
name='NEWLY CREATED'
)
log_info(f'scheduled_job info:')
log_info(f'job.id: {scheduled_job.id}')
log_info(f'job.name: {scheduled_job.name}')
log_info(f'job.func: {scheduled_job.func}')
try:
log_info(f'Starting main scheduling loop')
sched.start()
except (KeyboardInterrupt, SystemExit):
log_warn(f'Interrupted, shutting down')
sched.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment