Skip to content

Instantly share code, notes, and snippets.

@TheSithPadawan
Created April 17, 2022 01:47
Show Gist options
  • Save TheSithPadawan/6e01e99af3e5956a1293a5b2d7a70936 to your computer and use it in GitHub Desktop.
Save TheSithPadawan/6e01e99af3e5956a1293a5b2d7a70936 to your computer and use it in GitHub Desktop.
CFLT onsite prep -- task scheduler
# delayed scheduler
"""
write a function schedule(task, delay) to schedule a task that will be run after some delay
(1) first normal version
(2) thread safe version
"""
import heapq
import time
class Task:
def __init__(self, name):
self.name = name
self._start = 0
@property
def start_time(self):
return self._start
@start_time.setter
def start_time(self, t):
self._start = t
def __lt__(self, other):
if self._start != other._start:
return self._start < other._start
return self.name < other.name
def __str__(self):
return self.name
def run(self):
print ('running task', str(self))
# single thread
class Scheduler:
def __init__(self):
self.minheap = []
def schedule(self, task_name, delay):
newtask = Task(task_name)
newtask.start_time = time.time() + delay
heapq.heappush(self.minheap, newtask)
def run(self):
while True:
if not self.minheap:
break
if self.minheap and time.time() >= self.minheap[0].start_time:
task = heapq.heappop(self.minheap)
task.run()
# thread-safe version
class Scheduler:
def __init__(self):
self.minheap = []
self._cv = threading.Condition(threading.Lock())
self._start()
def schedule(self, task_name, delay):
newtask = Task(task_name)
newtask.start_time = time.time() + delay
with self._cv:
heapq.heappush(self.minheap, newtask)
self._cv.notify()
def run(self):
while True:
with self._cv:
while True:
timeout = None
if self.minheap:
if time.time() >= self.minheap[0].start_time:
task = heapq.heappop(self.minheap)
break
timeout = self.minheap[0].start_time - time.time()
print ('waiting for timweout for task', self.minheap[0].name, 'timeout', timeout)
self._cv.wait(timeout)
threading.Thread(target=task.run).start()
def _start(self):
threading.Thread(target=self.run).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment