Skip to content

Instantly share code, notes, and snippets.

@srleyva
Last active May 8, 2022 22:30
Show Gist options
  • Save srleyva/948dcd3dc609981662fc303f5ef1d263 to your computer and use it in GitHub Desktop.
Save srleyva/948dcd3dc609981662fc303f5ef1d263 to your computer and use it in GitHub Desktop.
from abc import ABC, abstractmethod
from copy import deepcopy
from dataclasses import dataclass, field
from enum import Enum
from itertools import cycle
from random import randint
from typing import ClassVar, List, Optional
from tabulate import tabulate
@dataclass
class Job:
_pid: int
_ticks: int
_time_slices: List[int] = field(default_factory=list)
response_time: Optional[int] = field(default=None)
turn_around_time: int = field(default=0)
def __str__(self) -> str:
return f"Job {self.pid}: ticks=[{self.ticks}] cpu_time={self.cpu_time}"
@property
def pid(self):
return self._pid
@property
def time_slices(self) -> List[int]:
return self._time_slices
@property
def cpu_time(self) -> int:
return len(self.time_slices)
@property
def ticks(self):
return self._ticks
@property
def complete(self):
return self.ticks == 0
def run(self, time_slice: int, quantum_timeslice: int = 1):
if self.response_time is None:
self.response_time = time_slice
self.time_slices.append(time_slice)
self._ticks -= quantum_timeslice
if self.complete:
self.turn_around_time = time_slice
@dataclass
class Scheduler(ABC):
name: ClassVar[str]
jobs: List[Job]
quantum_timeslice: int = field(default=1)
current_timeslice: int = field(default=0)
current_job: Optional[Job] = field(default=None)
_job_count: int = field(default=0)
def __post_init__(self):
self.current_job = self.choose_job()
self._job_count = len(self.jobs)
@property
@abstractmethod
def name(self) -> str:
pass
@property
def avg_turnaround_time(self):
def get_turnaround_time(job: Job) -> int:
return job.turn_around_time
return sum(map(get_turnaround_time, self.jobs))/len(self.jobs)
def get_fairness(self, job: Job):
return len(job.time_slices)/self.current_timeslice
@property
def avg_fairness(self):
return sum(map(self.get_fairness, self.jobs))/len(self.jobs)
@property
def avg_response_time(self):
def get_response_time(job: Job) -> int:
return job.response_time
return sum(map(get_response_time, self.jobs))/len(self.jobs)
@abstractmethod
def choose_job(self) -> Job:
pass
def __str__(self) -> str:
return f"[Time {self.current_timeslice}]: Name=[{self.name}] {self.current_job}"
@property
def complete(self) -> bool:
return self._job_count == 0
def run(self):
for job in self.choose_job():
if not job.complete:
self.current_job = job
# print(f"{self}")
self.run_cycle()
if self.complete:
break
def run_cycle(self):
self.current_job.run(self.current_timeslice, self.quantum_timeslice)
if self.current_job.complete:
self._job_count -= 1
# print(f"[Time {self.current_timeslice}] Completed: job=[{self.current_job}] ResponseTime=[{self.current_job.response_time}] TurnAroundTime=[{self.current_job.turn_around_time}] Left=[{self._job_count}/{len(self.jobs)}]")
self.current_timeslice += 1
class RRScheduler(Scheduler):
name = "Round Robin Scheduler"
def choose_job(self) -> Job:
for job in cycle(self.jobs):
yield job
class RunToCompletionScheduler(Scheduler):
name = "Run To Completion Scheduler"
def choose_job(self) -> Job:
for job in self.jobs:
while not job.complete:
yield job
class ShortestJobFirst(RunToCompletionScheduler):
name = "Shortest Job First"
def __post_init__(self):
self.jobs.sort(key=lambda job: job.ticks)
super().__post_init__()
@dataclass
class MLFQScheduler(Scheduler):
name = "Multi Level Feedback Queue"
allotment: int = field(default=5)
queue_count: int = field(default=3)
current_priority: int = field(default=0)
def __str__(self) -> str:
return f"{super().__str__()} current_queue={self.current_priority}"
def __post_init__(self):
super().__post_init__()
self.queues = [[] for i in range(self.queue_count)]
self.queues[0] = [job for job in self.jobs]
@property
def current_queue(self):
if len(self.queues[self.current_priority]) == 0:
self.current_priority += 1
return self.queues[self.current_priority]
@property
def next_priority(self):
return self.current_priority + 1 if self.current_priority != self.queue_count-1 else self.current_priority
def choose_job(self) -> Job:
while not self.complete:
current_job = self.current_queue.pop()
for i in range(self.allotment):
yield current_job
if current_job.complete:
break
else:
if not current_job.complete:
self.queues[self.next_priority].append(current_job)
class CompletelyFairScheduler(Scheduler):
name = "Completely Fair Scheduler"
def __post_init__(self):
super().__post_init__()
self._queue = self.RedBlackTree()
for job in self.jobs:
self._queue.insert(self.RedBlackTree.Node(job, niceness=randint(1, 20)))
@dataclass
class RedBlackTree:
root: Optional["Node"] = field(default=None)
@property
def list(self) -> List["Node"]:
return self.inorder_list([])
def inorder_list(self, in_order_list: List, current_node: Optional["Node"] = None):
current_node = current_node or self.root
if current_node.left:
self.inorder_list(in_order_list, current_node.left)
in_order_list.append(current_node.job.pid)
if current_node.right:
self.inorder_list(in_order_list, current_node.right)
return in_order_list
def insert(self, node: "Node", current_node: Optional["Node"] = None):
if self.root == None:
self.root = node
return
current_node = current_node or self.root
if node.virtual_runtime >= current_node.virtual_runtime:
if current_node.right is None:
current_node.right = node
else:
self.insert(node, current_node.right)
else:
if current_node.left is None:
current_node.left = node
else:
self.insert(node, current_node.left)
self.fix_tree()
def fix_tree(self):
# TODO: RB Tree Balancing
pass
def rotate_left(self, node: "Node"):
# TODO
pass
def rotate_right(self, node: "Node"):
# TODO
pass
def pop_left_node(self, current_node: Optional["Node"] = None) -> "Node":
current = current_node or self.root
node_to_return = current
while current.left:
if current.left.left is None:
node_to_return = current.left
current.left = node_to_return.right
node_to_return.right = None
break
current = current.left
if node_to_return == self.root:
self.root = node_to_return.right
node_to_return.right = None
return node_to_return
@dataclass
class Node:
class NodeColor(Enum):
Red = "red"
Black = "black"
job: Job
virtual_runtime: int = field(default=0)
color: NodeColor = field(default=NodeColor.Red)
niceness: int = field(default=1)
left: Optional["Node"] = field(default=None)
right: Optional["Node"] = field(default=None)
def choose_job(self) -> Job:
while not self.complete:
node = self._queue.pop_left_node()
yield node.job
if not node.job.complete:
node.virtual_runtime += 1 * node.niceness
self._queue.insert(node)
if __name__ == '__main__':
jobs = [Job(i, randint(1, 1000)) for i in range(100)]
rr_scheduler = RRScheduler(deepcopy(jobs))
rr_scheduler.run()
rtc_scheduler = RunToCompletionScheduler(deepcopy(jobs))
rtc_scheduler.run()
sjf_scheduler = ShortestJobFirst(deepcopy(jobs))
sjf_scheduler.run()
mlfq_scheduler = MLFQScheduler(deepcopy(jobs))
mlfq_scheduler.run()
cfs_scheduler = CompletelyFairScheduler(deepcopy(jobs))
cfs_scheduler.run()
data = [
[rr_scheduler.name, rr_scheduler.avg_response_time, rr_scheduler.avg_turnaround_time, rr_scheduler.avg_fairness],
[rtc_scheduler.name, rtc_scheduler.avg_response_time, rtc_scheduler.avg_turnaround_time, rtc_scheduler.avg_fairness],
[sjf_scheduler.name, sjf_scheduler.avg_response_time, sjf_scheduler.avg_turnaround_time, sjf_scheduler.avg_fairness],
[mlfq_scheduler.name, mlfq_scheduler.avg_response_time, mlfq_scheduler.avg_turnaround_time, mlfq_scheduler.avg_fairness],
[cfs_scheduler.name, cfs_scheduler.avg_response_time, cfs_scheduler.avg_turnaround_time, cfs_scheduler.avg_fairness]
]
print()
print(tabulate(data, tablefmt="github", headers=["name", "avg response time", "avg turnaround time", "avg fairness"]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment