Created
October 25, 2022 08:04
-
-
Save sadraskol/6155ac3c91f705c425297cee90b58c3b to your computer and use it in GitHub Desktop.
Running a simulation of ATP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import heapq | |
from collections import deque | |
# mapping of (queue, arrival_rate) | |
queue_rate = {'050_normal': 1.0, '030_fast': 5.0} | |
total_rate = sum(dict.values(queue_rate)) | |
# mapping of (queue, mean service time) | |
queue_size = {'050_normal': 1.0, '030_fast': 0.1} | |
# list of workers and their respective queues | |
workers_mapping = [['050_normal'], ['050_normal'], ['030_fast']] | |
class Job(object): | |
def __init__(self, t): | |
self.queue_name = self.select_queue() | |
self.size = self.select_size() | |
self.created_t = t | |
def select_queue(self): | |
population = list(queue_rate) | |
# queue weight is queue rate / sum of every rates | |
weights = map(lambda x: x / total_rate, dict.values(queue_rate)) | |
return random.choices(population, weights = weights, k = 1)[0] | |
def select_size(self): | |
return random.weibullvariate(queue_size[self.queue_name], 1.5) | |
class Queue(object): | |
def __init__(self, name): | |
self.deque = deque() | |
self.name = name | |
def append(self, job): | |
self.deque.append(job) | |
def pop(self): | |
return self.deque.popleft() | |
def len(self): | |
return len(self.deque) | |
class Worker(object): | |
# queues are passed by priority, in a resque fashion | |
def __init__(self, sim_name, queues): | |
self.busy = False | |
self.queues = queues | |
self.in_flight = None | |
self.sim_name = sim_name | |
def pop(self): | |
for q in self.queues: | |
if q.len() > 0: | |
return q.pop() | |
return None | |
def job_done(self, t): | |
assert(self.busy) | |
print("%f,%f,%f,%s,%s"%(t, t - self.in_flight.created_t, t - self.in_flight.created_t - self.in_flight.size, self.sim_name, self.in_flight.queue_name)) | |
next_job = self.pop() | |
if next_job != None: | |
self.in_flight = next_job | |
return [(t + next_job.size, self.job_done)] | |
else: | |
self.in_flight = None | |
self.busy = False | |
return None | |
def start(self, job): | |
assert(not self.busy) | |
self.busy = True | |
self.in_flight = job | |
def can_host(self, job): | |
queue_names = list(map(lambda q: q.name, self.queues)) | |
return job.queue_name in queue_names | |
def find_idle_worker(workers): | |
for w in workers: | |
if not w.busy: | |
return w | |
return None | |
def find_queue(queues, name): | |
for q in queues: | |
if name == q.name: | |
return q | |
print("could not find name '%s' in queues:%a"%(name, list(queues))) | |
assert(False) | |
class Client(object): | |
def __init__(self, queues, workers): | |
self.queues = queues | |
self.workers = workers | |
def generate(self, t): | |
job = Job(t) | |
next_t = t + random.weibullvariate(1.0 / total_rate, 1.5) | |
elligible_workers = list(filter(lambda w: w.can_host(job), self.workers)) | |
queue = find_queue(self.queues, job.queue_name) | |
idle_worker = find_idle_worker(elligible_workers) | |
if idle_worker == None: | |
queue.append(job) | |
return [(next_t, self.generate)] | |
else: | |
idle_worker.start(job) | |
return [(next_t, self.generate), (t + job.size, idle_worker.job_done)] | |
def sim_loop(max_t, client): | |
t = 0.0 | |
q = [(t, client.generate)] | |
while len(q) > 0 and t < max_t: | |
(t, call) = heapq.heappop(q) | |
new_events = call(t) | |
if new_events is not None: | |
q.extend(new_events) | |
heapq.heapify(q) | |
def make_worker(mapping, queues): | |
name = "+".join(mapping) | |
worker_queues = list(filter(lambda q: q.name in mapping, queues)) | |
return Worker(name, worker_queues) | |
def run_sims(max_t): | |
print("t,service_time,q_time,worker,queue") | |
queues = list(map(lambda q: Queue(q), dict.keys(queue_rate))) | |
workers = list(map(lambda m: make_worker(m, queues), workers_mapping)) | |
client = Client(queues, workers) | |
sim_loop(max_t, client) | |
# higher max_t are better for precision, but they're slower to run | |
run_sims(100000.0) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(ggplot2) | |
d = read.table("results.csv", sep=",", header=TRUE) | |
gg = ggplot(d, aes(q_time, colour=worker)) + | |
geom_freqpoly() + | |
scale_y_log10() + | |
ylab("Count") + | |
xlab("Queue Time (s)") | |
ggsave("poly.png", dpi=100, width=8, height=5) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/bash | |
echo "runing the simulation" | |
python3 atp.py > results.csv | |
echo "plotting the graph" | |
Rscript plot.R 2> /dev/null |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment