Created
April 17, 2022 01:47
-
-
Save TheSithPadawan/6e01e99af3e5956a1293a5b2d7a70936 to your computer and use it in GitHub Desktop.
CFLT onsite prep -- task scheduler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# delayed scheduler | |
""" | |
write a function schedule(task, delay) to schedule a task that will be run after some delay | |
(1) first normal version | |
(2) thread safe version | |
""" | |
import heapq | |
import time | |
class Task: | |
def __init__(self, name): | |
self.name = name | |
self._start = 0 | |
@property | |
def start_time(self): | |
return self._start | |
@start_time.setter | |
def start_time(self, t): | |
self._start = t | |
def __lt__(self, other): | |
if self._start != other._start: | |
return self._start < other._start | |
return self.name < other.name | |
def __str__(self): | |
return self.name | |
def run(self): | |
print ('running task', str(self)) | |
# single thread | |
class Scheduler: | |
def __init__(self): | |
self.minheap = [] | |
def schedule(self, task_name, delay): | |
newtask = Task(task_name) | |
newtask.start_time = time.time() + delay | |
heapq.heappush(self.minheap, newtask) | |
def run(self): | |
while True: | |
if not self.minheap: | |
break | |
if self.minheap and time.time() >= self.minheap[0].start_time: | |
task = heapq.heappop(self.minheap) | |
task.run() | |
# thread-safe version | |
class Scheduler: | |
def __init__(self): | |
self.minheap = [] | |
self._cv = threading.Condition(threading.Lock()) | |
self._start() | |
def schedule(self, task_name, delay): | |
newtask = Task(task_name) | |
newtask.start_time = time.time() + delay | |
with self._cv: | |
heapq.heappush(self.minheap, newtask) | |
self._cv.notify() | |
def run(self): | |
while True: | |
with self._cv: | |
while True: | |
timeout = None | |
if self.minheap: | |
if time.time() >= self.minheap[0].start_time: | |
task = heapq.heappop(self.minheap) | |
break | |
timeout = self.minheap[0].start_time - time.time() | |
print ('waiting for timweout for task', self.minheap[0].name, 'timeout', timeout) | |
self._cv.wait(timeout) | |
threading.Thread(target=task.run).start() | |
def _start(self): | |
threading.Thread(target=self.run).start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment