Skip to content

Instantly share code, notes, and snippets.

@ismasan
Created August 7, 2023 14:15
Show Gist options
  • Save ismasan/5a4e415a90a4661a0a87c13fbcaa9042 to your computer and use it in GitHub Desktop.
Save ismasan/5a4e415a90a4661a0a87c13fbcaa9042 to your computer and use it in GitHub Desktop.
Host and run concurrent workers as ruby Fibers, using Async library
require 'pg'
require 'connection_pool'
require 'async'
DB = ConnectionPool.new(size: 5, timeout: 5) do
PG.connect(dbname: 'test_db')
end
class DBClient
def initialize(db)
@db = db
@closed = false
end
def close
@closed = true
end
def get
@db.with do |conn|
conn.exec("select NOW() AS foo, pg_sleep(2)").getvalue(0,0)
end
end
def listen(channel, &block)
@db.with do |conn|
conn.exec("LISTEN #{channel}")
loop do
break if @closed
conn.wait_for_notify(&block)
end
end
end
end
class Client
def initialize(*_)
end
def get
sleep 2
Time.now
end
end
class Host
def initialize
@workers = []
@runner = nil
end
def register(worker = nil, &block)
@workers << (worker || block)
end
def run
@runner = Sync do |task|
@workers.map do |worker|
task.async do |subtask|
worker.call
end
end.map(&:wait)
puts 'All done'
end
end
def stop
@runner.stop if @runner
end
end
host = Host.new
host.register do
client = DBClient.new(DB)
while true
puts "w1: #{client.get}"
end
end
host.register do
client = DBClient.new(DB)
while true
puts "w2: #{client.get}"
end
end
host.register do
client = DBClient.new(DB)
client.listen('test') do |_, pid, payload|
puts "w3: #{pid} #{payload}"
end
end
def shutdown(host)
host.stop
DB.shutdown(&:close)
puts 'Terminated'
exit
end
Signal.trap("INT") {
Thread.new { shutdown(host) }
}
host.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment