Created
July 27, 2022 08:44
-
-
Save HanSooloo/371c1b2d4ab4b065bb66aba4f6902c59 to your computer and use it in GitHub Desktop.
apscheduler test program
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from apscheduler.schedulers.blocking import BlockingScheduler | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from apscheduler.jobstores.redis import RedisJobStore | |
from apscheduler.executors.pool import ThreadPoolExecutor | |
import apscheduler.events as events | |
from apscheduler_event_types import event_code, EventCode | |
import redis | |
from urllib.parse import urlparse | |
import time | |
import logging | |
import sys | |
import os | |
from datetime import datetime, timedelta | |
import inspect | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, | |
format='%(asctime)s: %(levelname)8s %(module)8s - %(funcName)s: %(message)s', | |
) | |
logging.getLogger('apscheduler').setLevel(logging.DEBUG) | |
def log_info(message): | |
logging.log(logging.INFO, message) | |
def log_warn(message): | |
logging.log(logging.WARN, message) | |
def log_debug(message): | |
logging.log(logging.DEBUG, message) | |
# Set the following env variables for proper operation: | |
# UPSTASH_REDIS: full URL including password, host, and port | |
# REDIS_SSL_CERT_VERIFY: `required` for UPSTASH, `none` for local | |
redis_url = os.getenv('UPSTASH_REDIS', 'redis://localhost:6379') | |
redis_ssl_verify = os.getenv('REDIS_SSL_CERT_VERIFY', 'required') | |
r = redis.from_url(redis_url, ssl_cert_reqs=redis_ssl_verify) | |
url = urlparse(redis_url) | |
REDIS_HOST = url.hostname | |
REDIS_PORT = url.port | |
REDIS_PASS = url.password | |
REDIS_PROTO = url.scheme | |
log_debug(f'REDIS URL: {redis_url}') | |
log_debug(f'redis_ssl_verify: {redis_ssl_verify}') | |
def job_delay_wrapper(func): | |
""" | |
This fixes an issue with APScheduler: https://github.com/agronholm/apscheduler/issues/445 | |
""" | |
def delayed_job(*args, **kwargs): | |
sleep_duration = 0.1 | |
log_debug(f'Sleeping for {sleep_duration} seconds ...') | |
time.sleep(sleep_duration) | |
func(*args, **kwargs) | |
return delayed_job | |
def find_race_round_number(): | |
round_number = random.randint(1000, 9999) | |
return round_number | |
@job_delay_wrapper | |
def gather_threads(): | |
my_name = inspect.currentframe().f_code.co_name | |
log_info(f'Doing work in ... {my_name}') | |
# work_duration = 3 | |
# log_info(f'This is a long running job: {work_duration} seconds') | |
log_info(f'Recording job info.') | |
# r.hset('f1_bot:2022:11:anouncements:rawe_ceek', '99-99-99', datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")) | |
round_number = find_race_round_number() | |
log_info(f'Find race week Round number ... {round_number}') | |
r.hset(f'f1_bot:2022:{round_number}:announcements:rawe_ceek', '99-99-99', datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")) | |
# time.sleep(work_duration) | |
return f'_SOME_RESULT_' | |
def job_execution_listener(event): | |
log_info(f'Event code: {event.code}, type: {event_code[event.code]}') | |
job = sched.get_job(event.job_id) | |
if job is None: | |
job_name = "NO JOB NAME" | |
else: | |
job_name = job.name | |
if not hasattr(event, 'scheduled_run_time'): | |
scheduled_run_time = "NO SCHEDULED RUN TIME YET" | |
else: | |
scheduled_run_time = event.scheduled_run_time | |
if event.exception: | |
log_debug(f'Event type: {event_code[event.code]}') | |
log_debug(f'The job has crashed: {event.job_id}') | |
log_debug(f'Relevant CRASH return info: {event.retval}') | |
else: | |
log_debug(f'=================================================') | |
log_debug(f'JOB EXECUTION EVENT') | |
log_debug(f'Event type: {event_code[event.code]}') | |
log_debug(f'Job ID: {event.job_id}') | |
log_debug(f'Job Name: {job_name}') | |
log_debug(f'Scheduled Run Time: {scheduled_run_time}') | |
if event.retval: | |
log_debug(f'Relevant COMPLETION return info: {event.retval}') | |
log_debug(f'=================================================') | |
def job_management_listener(event): | |
log_info(f'Event code: {event.code}, type: {event_code[event.code]}') | |
job = sched.get_job(event.job_id) | |
if job is None: | |
job_name = "NO JOB NAME" | |
else: | |
job_name = job.name | |
if not hasattr(event, 'scheduled_run_time'): | |
scheduled_run_time = "NO SCHEDULED RUN TIME YET" | |
else: | |
scheduled_run_time = event.scheduled_run_time | |
log_debug(f'=================================================') | |
log_debug(f'JOB MANAGEMENT EVENT') | |
log_debug(f'Event type: {event_code[event.code]}') | |
log_debug(f'Job ID: {event.job_id}') | |
log_debug(f'Job Name: {job_name}') | |
log_debug(f'Scheduled Run Time: {scheduled_run_time}') | |
log_debug(f'=================================================') | |
def get_datefmt(date_time): | |
return date_time.strftime('%Y-%m-%d %H:%M:%S') | |
if REDIS_PROTO == 'rediss': | |
ssl = True | |
else: | |
ssl = False | |
jobstores = { | |
'default': RedisJobStore(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASS, | |
ssl_cert_reqs=redis_ssl_verify, | |
ssl=ssl, | |
jobs_key='apscheduler.jobs', | |
run_times_key='apscheduler.run_times') | |
} | |
executors = { | |
'default': ThreadPoolExecutor(20) | |
} | |
scheduler_timezone = 'Asia/Hong_Kong' | |
# scheduler_timezone = 'UTC' | |
sched = BlockingScheduler(jobstores=jobstores, executors=executors, timezone=scheduler_timezone) | |
# background_scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors) | |
# sched = background_scheduler | |
def flags_to_text(n): | |
flags = ', '.join( | |
[flag.name for flag in EventCode if n & flag.value] | |
) | |
return flags | |
now = datetime.now() | |
job_run_start = now + timedelta(seconds=5) | |
# job_run_stop = now + timedelta(seconds=35) | |
# log_info(f'job_run_start: {get_datefmt(job_run_start)}') | |
# log_info(f'job_run_stop: {get_datefmt(job_run_stop)}') | |
if __name__ == '__main__': | |
log_info(f'Starting __main__ program loop.') | |
log_info(f'Adding listeners') | |
execution_flags = events.EVENT_JOB_EXECUTED | \ | |
events.EVENT_JOB_ERROR | |
log_debug(f'Execution Listener flag names: {flags_to_text(execution_flags)}') | |
sched.add_listener(job_execution_listener, | |
execution_flags | |
) | |
management_flags = events.EVENT_JOB_ADDED | \ | |
events.EVENT_JOB_MODIFIED | \ | |
events.EVENT_JOB_SUBMITTED | \ | |
events.EVENT_JOB_REMOVED | |
log_debug(f'Management Listener flag names: {flags_to_text(management_flags)}') | |
sched.add_listener(job_management_listener, | |
management_flags | |
) | |
log_info(f'Adding scheduled jobs') | |
job_function = f'{os.path.splitext(os.path.basename(__file__))[0]}:gather_threads' | |
# scheduled_job = sched.add_job(func=job_function, | |
# trigger='cron', | |
# second='*/15', | |
# start_date=get_datefmt(job_run_start), | |
# name=job_function | |
# ) | |
scheduled_job = sched.add_job(func=job_function, | |
trigger='date', | |
run_date=datetime.now() + timedelta(seconds=15), | |
name='NEWLY CREATED' | |
) | |
log_info(f'scheduled_job info:') | |
log_info(f'job.id: {scheduled_job.id}') | |
log_info(f'job.name: {scheduled_job.name}') | |
log_info(f'job.func: {scheduled_job.func}') | |
try: | |
log_info(f'Starting main scheduling loop') | |
sched.start() | |
except (KeyboardInterrupt, SystemExit): | |
log_warn(f'Interrupted, shutting down') | |
sched.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment