Skip to content

Instantly share code, notes, and snippets.

@RomainFranceschini
Last active April 26, 2020 00:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RomainFranceschini/8f6701c5ee5d6c27fe3791661440c604 to your computer and use it in GitHub Desktop.
Save RomainFranceschini/8f6701c5ee5d6c27fe3791661440c604 to your computer and use it in GitHub Desktop.
Playing with queues and Discrete-Event Simulations in Crystal using Quartz

Based on the example provided by lbarasti in this other gist. However this version use the Quartz simulation framework.

Prerequisite

Your system has an installation of crystal 0.34.0 or higher.

How to run

Clone the files in this gist, then run the following once, to install the code dependencies.

shards install

You can now run a simulation with

crystal run queuing_system.cr

You can now tweak the model and run your own simulation!

require "quartz"
require "statistics"
require "tablo"
include Statistics::Distributions
include Quartz
class Arrival < AtomicModel
output :user
precision :milli
property gen : Distribution(Float64) = Uniform.new(0, 1)
def time_advance : Duration
Duration.from(Math.max(0.0, self.gen.rand).round(3))
end
def output
post nil, on: :user
end
def internal_transition
end
def external_transition(bag)
end
end
class Queue < AtomicModel
precision :milli
input :in, :next
state do
var free_resources = Hash(String, Bool).new(true)
var size = 0
var total_waiting_time = duration(0)
end
def output
sent = 0
each_output_port do |oport|
if state.free_resources[oport.name]
post nil, on: oport.name
sent += 1
break if (state.size - sent) == 0
end
end
end
def internal_transition
each_output_port do |oport|
name = oport.name.as(String)
if state.free_resources[name]
state.free_resources[name] = false
state.size -= 1
break if state.size == 0
end
end
end
def external_transition(bag)
state.total_waiting_time += self.elapsed
if bag.has_key?(input_port(:in))
state.size += bag[input_port(:in)].size
end
if bag.has_key?(input_port(:next))
bag[input_port(:next)].each do |sender|
state.free_resources[sender.as_s] = true
end
end
end
def ready?
state.size > 0 && each_output_port.any? { |p| state.free_resources[p.name] }
end
def time_advance : Duration
ready? ? 0.time_units : Duration.infinity
end
end
class Machine < AtomicModel
precision :milli
input :process
output :processed, :done
property gen : Distribution(Float64) = Uniform.new(0, 1)
property inputs_needed = 1 # the number of inputs needed to start working
state do
var total_processing_time = duration(0)
var inputs_received = 0
var task_duration : Duration? = nil
getter! task_duration
end
def output
post self.name, on: :done
post nil, on: :processed
end
def internal_transition
state.total_processing_time += state.task_duration
state.task_duration = nil
state.inputs_received = 0
end
def external_transition(bag)
if state.task_duration?.nil? # not busy
state.inputs_received += bag[input_port(:process)].size
if state.inputs_received >= self.inputs_needed
state.task_duration = Duration.from(Math.max(0.0, self.gen.rand).round(3))
end
else
state.total_processing_time += self.elapsed
state.task_duration -= self.elapsed
end
end
def time_advance : Duration
state.task_duration? || Duration.infinity
end
end
class QueuingSystem < CoupledModel
getter queue_mapping
def initialize(name)
super(name)
queues = 5.times.map { |i| Queue.new("q#{i + 1}") }.to_a
x = Arrival.new("x").tap { |m| m.gen = Uniform.new(0.4, 0.6) }
y = Arrival.new("y").tap { |m| m.gen = Normal.new(0.5, 0.2) }
m1 = Machine.new("m1").tap { |m| m.gen = Normal.new(0.1, 0.03) }
m2 = Machine.new("m2").tap { |m| m.gen = Normal.new(0.15, 0.04) }
m3 = Machine.new("m3").tap { |m| m.gen = Constant.new(0.3); m.inputs_needed = 2 }
m4a = Machine.new("m4a").tap { |m| m.gen = Normal.new(0.6, 0.13) }
m4b = Machine.new("m4b").tap { |m| m.gen = Normal.new(0.6, 0.13) }
queues << Queue.new("sink")
@queue_mapping = {x => queues[0], y => queues[1], m1 => queues[2], m2 => queues[3],
m3 => queues[4], m4a => queues[5], m4b => queues[5]} of Arrival | Machine => Queue
queues[0].add_output_port "m1"
queues[1].add_output_port "m2"
queues[2].add_output_port "m3"
queues[3].add_output_port "m3"
queues[4].add_output_port "m4a"
queues[4].add_output_port "m4b"
queues.each { |q| self << q }
self << x << y << m1 << m2 << m3 << m4a << m4b
attach :user, to: :in, between: "x", and: "q1"
attach "m1", to: :process, between: "q1", and: "m1"
attach :done, to: :next, between: "m1", and: "q1"
attach :processed, to: :in, between: "m1", and: "q3"
attach :user, to: :in, between: "y", and: "q2"
attach "m2", to: :process, between: "q2", and: "m2"
attach :done, to: :next, between: "m2", and: "q2"
attach :processed, to: :in, between: "m2", and: "q4"
attach "m3", to: :process, between: "q3", and: "m3"
attach "m3", to: :process, between: "q4", and: "m3"
attach :done, to: :next, between: "m3", and: "q3"
attach :done, to: :next, between: "m3", and: "q4"
attach :processed, to: :in, between: "m3", and: "q5"
attach "m4a", to: :process, between: "q5", and: "m4a"
attach "m4b", to: :process, between: "q5", and: "m4b"
attach :done, to: :next, between: "m4a", and: "q5"
attach :done, to: :next, between: "m4b", and: "q5"
attach :processed, to: :in, between: "m4a", and: "sink"
attach :processed, to: :in, between: "m4b", and: "sink"
end
end
def report(model, iterations)
machines = model.each_child.to_a.reject(Queue).map do |m|
queue = model.queue_mapping[m.as(Arrival | Machine)]
percentage = if m.is_a?(Machine)
busy_time = m.state.total_processing_time.to_f
Math.min(busy_time / iterations * 100, 100)
else
100.0
end
[m.name, "#{percentage.round(2)}%", queue.state.size]
end
table = Tablo::Table.new(machines) do |t|
t.add_column("machine") { |n| n[0] }
t.add_column("utilisation") { |n| n[1] }
t.add_column("output queue size") { |n| n[2] }
end
puts table
end
Quartz.set_no_log_backend # or Quartz.set_debug_log_level for full trace
STEPS = 1000
model = QueuingSystem.new("queuing_system")
Simulation.new(model, duration: STEPS.time_units).simulate
report(model, STEPS)
name: queuing
version: 0.1.0
authors:
- RomainFranceschini <RomainFranceschini@users.noreply.github.com>
dependencies:
quartz:
github: RomainFranceschini/quartz
commit: 7bbf87c464dc2ad9f33c83b7a49d5da917d2d0fa
tablo:
github: hutou/tablo
version: 0.9.3
statistics:
github: lbarasti/statistics
version: 0.2.0
crystal: 0.34.0
license: MIT
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment