|
require "quartz" |
|
require "statistics" |
|
require "tablo" |
|
|
|
include Statistics::Distributions |
|
include Quartz |
|
|
|
class Arrival < AtomicModel |
|
output :user |
|
precision :milli |
|
property gen : Distribution(Float64) = Uniform.new(0, 1) |
|
|
|
def time_advance : Duration |
|
Duration.from(Math.max(0.0, self.gen.rand).round(3)) |
|
end |
|
|
|
def output |
|
post nil, on: :user |
|
end |
|
|
|
def internal_transition |
|
end |
|
|
|
def external_transition(bag) |
|
end |
|
end |
|
|
|
class Queue < AtomicModel |
|
precision :milli |
|
input :in, :next |
|
|
|
state do |
|
var free_resources = Hash(String, Bool).new(true) |
|
var size = 0 |
|
var total_waiting_time = duration(0) |
|
end |
|
|
|
def output |
|
sent = 0 |
|
each_output_port do |oport| |
|
if state.free_resources[oport.name] |
|
post nil, on: oport.name |
|
sent += 1 |
|
break if (state.size - sent) == 0 |
|
end |
|
end |
|
end |
|
|
|
def internal_transition |
|
each_output_port do |oport| |
|
name = oport.name.as(String) |
|
if state.free_resources[name] |
|
state.free_resources[name] = false |
|
state.size -= 1 |
|
break if state.size == 0 |
|
end |
|
end |
|
end |
|
|
|
def external_transition(bag) |
|
state.total_waiting_time += self.elapsed |
|
|
|
if bag.has_key?(input_port(:in)) |
|
state.size += bag[input_port(:in)].size |
|
end |
|
|
|
if bag.has_key?(input_port(:next)) |
|
bag[input_port(:next)].each do |sender| |
|
state.free_resources[sender.as_s] = true |
|
end |
|
end |
|
end |
|
|
|
def ready? |
|
state.size > 0 && each_output_port.any? { |p| state.free_resources[p.name] } |
|
end |
|
|
|
def time_advance : Duration |
|
ready? ? 0.time_units : Duration.infinity |
|
end |
|
end |
|
|
|
class Machine < AtomicModel |
|
precision :milli |
|
input :process |
|
output :processed, :done |
|
property gen : Distribution(Float64) = Uniform.new(0, 1) |
|
property inputs_needed = 1 # the number of inputs needed to start working |
|
|
|
state do |
|
var total_processing_time = duration(0) |
|
var inputs_received = 0 |
|
var task_duration : Duration? = nil |
|
getter! task_duration |
|
end |
|
|
|
def output |
|
post self.name, on: :done |
|
post nil, on: :processed |
|
end |
|
|
|
def internal_transition |
|
state.total_processing_time += state.task_duration |
|
state.task_duration = nil |
|
state.inputs_received = 0 |
|
end |
|
|
|
def external_transition(bag) |
|
if state.task_duration?.nil? # not busy |
|
state.inputs_received += bag[input_port(:process)].size |
|
if state.inputs_received >= self.inputs_needed |
|
state.task_duration = Duration.from(Math.max(0.0, self.gen.rand).round(3)) |
|
end |
|
else |
|
state.total_processing_time += self.elapsed |
|
state.task_duration -= self.elapsed |
|
end |
|
end |
|
|
|
def time_advance : Duration |
|
state.task_duration? || Duration.infinity |
|
end |
|
end |
|
|
|
class QueuingSystem < CoupledModel |
|
getter queue_mapping |
|
|
|
def initialize(name) |
|
super(name) |
|
|
|
queues = 5.times.map { |i| Queue.new("q#{i + 1}") }.to_a |
|
x = Arrival.new("x").tap { |m| m.gen = Uniform.new(0.4, 0.6) } |
|
y = Arrival.new("y").tap { |m| m.gen = Normal.new(0.5, 0.2) } |
|
m1 = Machine.new("m1").tap { |m| m.gen = Normal.new(0.1, 0.03) } |
|
m2 = Machine.new("m2").tap { |m| m.gen = Normal.new(0.15, 0.04) } |
|
m3 = Machine.new("m3").tap { |m| m.gen = Constant.new(0.3); m.inputs_needed = 2 } |
|
m4a = Machine.new("m4a").tap { |m| m.gen = Normal.new(0.6, 0.13) } |
|
m4b = Machine.new("m4b").tap { |m| m.gen = Normal.new(0.6, 0.13) } |
|
queues << Queue.new("sink") |
|
|
|
@queue_mapping = {x => queues[0], y => queues[1], m1 => queues[2], m2 => queues[3], |
|
m3 => queues[4], m4a => queues[5], m4b => queues[5]} of Arrival | Machine => Queue |
|
|
|
queues[0].add_output_port "m1" |
|
queues[1].add_output_port "m2" |
|
queues[2].add_output_port "m3" |
|
queues[3].add_output_port "m3" |
|
queues[4].add_output_port "m4a" |
|
queues[4].add_output_port "m4b" |
|
|
|
queues.each { |q| self << q } |
|
self << x << y << m1 << m2 << m3 << m4a << m4b |
|
|
|
attach :user, to: :in, between: "x", and: "q1" |
|
attach "m1", to: :process, between: "q1", and: "m1" |
|
attach :done, to: :next, between: "m1", and: "q1" |
|
attach :processed, to: :in, between: "m1", and: "q3" |
|
|
|
attach :user, to: :in, between: "y", and: "q2" |
|
attach "m2", to: :process, between: "q2", and: "m2" |
|
attach :done, to: :next, between: "m2", and: "q2" |
|
attach :processed, to: :in, between: "m2", and: "q4" |
|
|
|
attach "m3", to: :process, between: "q3", and: "m3" |
|
attach "m3", to: :process, between: "q4", and: "m3" |
|
attach :done, to: :next, between: "m3", and: "q3" |
|
attach :done, to: :next, between: "m3", and: "q4" |
|
attach :processed, to: :in, between: "m3", and: "q5" |
|
|
|
attach "m4a", to: :process, between: "q5", and: "m4a" |
|
attach "m4b", to: :process, between: "q5", and: "m4b" |
|
attach :done, to: :next, between: "m4a", and: "q5" |
|
attach :done, to: :next, between: "m4b", and: "q5" |
|
attach :processed, to: :in, between: "m4a", and: "sink" |
|
attach :processed, to: :in, between: "m4b", and: "sink" |
|
end |
|
end |
|
|
|
def report(model, iterations) |
|
machines = model.each_child.to_a.reject(Queue).map do |m| |
|
queue = model.queue_mapping[m.as(Arrival | Machine)] |
|
percentage = if m.is_a?(Machine) |
|
busy_time = m.state.total_processing_time.to_f |
|
Math.min(busy_time / iterations * 100, 100) |
|
else |
|
100.0 |
|
end |
|
[m.name, "#{percentage.round(2)}%", queue.state.size] |
|
end |
|
|
|
table = Tablo::Table.new(machines) do |t| |
|
t.add_column("machine") { |n| n[0] } |
|
t.add_column("utilisation") { |n| n[1] } |
|
t.add_column("output queue size") { |n| n[2] } |
|
end |
|
|
|
puts table |
|
end |
|
|
|
Quartz.set_no_log_backend # or Quartz.set_debug_log_level for full trace |
|
STEPS = 1000 |
|
model = QueuingSystem.new("queuing_system") |
|
Simulation.new(model, duration: STEPS.time_units).simulate |
|
report(model, STEPS) |