Skip to content

Instantly share code, notes, and snippets.

@y8

y8/locking.cr Secret

Last active February 4, 2022 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save y8/1cd372e58d53c9eb7ae19089aa80624c to your computer and use it in GitHub Desktop.
Save y8/1cd372e58d53c9eb7ae19089aa80624c to your computer and use it in GitHub Desktop.
require "log"
class Lockinger
alias ResponseChannel = Channel(String)
alias RequestChannel = Channel(ResponseChannel)
@request_count : Atomic(Int64) = Atomic(Int64).new(0)
@queue_size = Atomic(Int64).new(0)
def initialize
@bus = RequestChannel.new
@bus_mutex = Mutex.new
response_total_count = Atomic(Int64).new(0)
4.times do |worker_id|
spawn name: "Worker#{worker_id}" do
response_count = 0
Log.info &.emit(worker: worker_id, fiber: Fiber.current.to_s, thread: Thread.current.to_s)
loop do
response_count += 1
response_total_count.add 1
respone_id = response_total_count.get
response_channel = @bus.receive
start = Time.monotonic
@queue_size.sub 1
# something happenign here
response_channel.send "! w=#{worker_id}, r=#{response_count}, rt=#{respone_id}, q=#{@queue_size.get}, tr=#{Thread.current}, f=#{Fiber.current}"
Log.info &.emit(w: worker_id, rt: respone_id, e: elapsed(start))
if response_count % 100 == 0
Fiber.yield
end
end
end
end
end
def call
@queue_size.add 1
@request_count.add 1
call_id = @request_count.get
start = Time.monotonic
Log.info &.emit(request: call_id, q: @queue_size.get, fiber: Fiber.current.to_s, thread: Thread.current.to_s)
loopback = ResponseChannel.new
send_to_bus loopback
body = loopback.receive
Log.info &.emit(response: call_id, body: body, fiber: Fiber.current.to_s, thread: Thread.current.to_s, time: elapsed(start))
end
def send_to_bus(what : ResponseChannel)
@bus.send what
end
def elapsed(start)
runtime = Time.monotonic - start
runtime.total_seconds.humanize(precision: 2, significant: false)
end
end
instance = Lockinger.new
launcher = Channel(Nil).new
query_per_second = 200
pool_size = 64
work_size = 1024
throttle = 1.second / query_per_second / pool_size
pool_size.times do |pool_id|
spawn name: "pool=#{pool_id}" do
::Log.info { "Arming #{pool_id}, delay: #{throttle}" }
launcher.receive?
work_size.times do |work_id|
spawn(name: "pool=#{pool_id}, w=#{work_id}") { instance.call }
Fiber.timeout(throttle)
if work_id % 100 == 0
Fiber.yield
end
end
end
end
spawn do
loop do
::Log.info { "!!!! Status: q=#{instance.@queue_size.get}" }
sleep(1.seconds)
end
end
pool_size.times do |i|
::Log.info { "Disarming #{i}" }
launcher.send nil
end
Channel(Nil).new.receive
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment