Created
August 7, 2023 14:15
-
-
Save ismasan/5a4e415a90a4661a0a87c13fbcaa9042 to your computer and use it in GitHub Desktop.
Host and run concurrent workers as ruby Fibers, using Async library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'pg' | |
require 'connection_pool' | |
require 'async' | |
DB = ConnectionPool.new(size: 5, timeout: 5) do | |
PG.connect(dbname: 'test_db') | |
end | |
class DBClient | |
def initialize(db) | |
@db = db | |
@closed = false | |
end | |
def close | |
@closed = true | |
end | |
def get | |
@db.with do |conn| | |
conn.exec("select NOW() AS foo, pg_sleep(2)").getvalue(0,0) | |
end | |
end | |
def listen(channel, &block) | |
@db.with do |conn| | |
conn.exec("LISTEN #{channel}") | |
loop do | |
break if @closed | |
conn.wait_for_notify(&block) | |
end | |
end | |
end | |
end | |
class Client | |
def initialize(*_) | |
end | |
def get | |
sleep 2 | |
Time.now | |
end | |
end | |
class Host | |
def initialize | |
@workers = [] | |
@runner = nil | |
end | |
def register(worker = nil, &block) | |
@workers << (worker || block) | |
end | |
def run | |
@runner = Sync do |task| | |
@workers.map do |worker| | |
task.async do |subtask| | |
worker.call | |
end | |
end.map(&:wait) | |
puts 'All done' | |
end | |
end | |
def stop | |
@runner.stop if @runner | |
end | |
end | |
host = Host.new | |
host.register do | |
client = DBClient.new(DB) | |
while true | |
puts "w1: #{client.get}" | |
end | |
end | |
host.register do | |
client = DBClient.new(DB) | |
while true | |
puts "w2: #{client.get}" | |
end | |
end | |
host.register do | |
client = DBClient.new(DB) | |
client.listen('test') do |_, pid, payload| | |
puts "w3: #{pid} #{payload}" | |
end | |
end | |
def shutdown(host) | |
host.stop | |
DB.shutdown(&:close) | |
puts 'Terminated' | |
exit | |
end | |
Signal.trap("INT") { | |
Thread.new { shutdown(host) } | |
} | |
host.run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment