Skip to content

Instantly share code, notes, and snippets.

@pingzh
Created September 1, 2022 21:07
Show Gist options
  • Save pingzh/6717ff99b4ca31d5b02161f7999a9dd8 to your computer and use it in GitHub Desktop.
Save pingzh/6717ff99b4ca31d5b02161f7999a9dd8 to your computer and use it in GitHub Desktop.
class SchedulerFramework:
"""
General Rules:
1. SchedulerFramework does not assume scheduler implementation
2. SchedulerFramework does not parse dag files
"""
def __init__(self, executor, scheduler_cls):
self.executor = executor
self.scheduler = scheduler_cls(executor=self.executor, num_runs=-1)
def run(self):
self.before_run_setup()
while not self.loop_stop_criteria():
self.scheduler.make_scheduling_decisions() # mark tis state as scheduled
#
self.scheduler.send_queuable_tasks_to_executor() # mark tis state as queued and send to executor
self.heartbeat()
self.scheduler.process_system_events()
self.before_exit()
def before_run_setup(self):
self.scheduler.before_run_setup()
def loop_stop_criteria(self):
return self.scheduler.should_exit_loop()
def heartbeat(self):
self.executor.heartbeat()
self.scheduler.heartbeat()
self.update_heartbeat_time()
def update_heartbeat_time(self):
pass
def before_exit(self):
self.scheduler.before_exit()
class BaseScheduler:
def __init__(self, executor):
self.executor = executor
def before_exit(self):
"""Before exit clean up"""
pass
def before_run_setup(self):
"""Set up before the scheduling loop"""
pass
def make_scheduling_decisions(self):
"""
scheduling decision means marking state of schedulable task instances as `scheduled`, it includes:
1. Create new dag runs if needed
2. Collect schedulable task instances
3. Manage task instance states, e.g. marking upstream_failed, skipped
4. Mark their state as `scheduled`
"""
pass
def send_queuable_tasks_to_executor(self):
"""
This method does:
1. Find queuable task instances by applying dag/task concurrency limit, pool slot quota etc
2. Update state of queuable task instances as `queued`
3. Send them to the executor
"""
pass
def process_system_events(self):
"""
This method processes system events including 1) zombie tasks 2) executor events
"""
pass
def should_exit_loop(self):
"""When to exit the scheduling loop"""
pass
def heartbeat(self):
"""Heartbeat the scheduler"""
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment