Skip to content

Instantly share code, notes, and snippets.

@shalvah
Last active February 18, 2023 12:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shalvah/ef750bc1bb1f01f6392709f071a670e2 to your computer and use it in GitHub Desktop.
Save shalvah/ef750bc1bb1f01f6392709f071a670e2 to your computer and use it in GitHub Desktop.
Ruby event loop + web server (single thread) https://blog.shalvah.me/posts/experiments-in-concurrency-3-event-loops
require 'algorithms'
require 'fiber'
class EventLoop
def initialize
@queue = TaskQueue.new
end
def run(&entrypoint)
entrypoint.call
until @queue.empty?
callback = @queue.next
callback.call if callback
end
end
def set_timeout(timeout, &callback)
current_time = Time.now.to_f * 1000
@queue.add_timer(current_time + timeout, callback)
end
def run_coroutine(block, *args, &continuation)
fiber = Fiber.new &block
continue_fiber(fiber, *args, &continuation)
end
def continue_fiber(fiber, *args, &continuation)
return_val = fiber.resume(*args)
if fiber.alive?
@queue.add { continue_fiber(fiber, &continuation) }
else
if continuation
@queue.add { continuation.call(return_val) }
end
end
end
def add_callback(&callback)
@queue.add &callback
end
end
class TaskQueue
def initialize
@callbacks = []
@timers = Containers::PriorityQueue.new
end
def empty?
@timers.empty? && @callbacks.empty?
end
def next
next_timer = @timers.next
current_time = Time.now.to_f * 1000
if next_timer && (current_time >= next_timer[:scheduled_time])
@timers.pop
return next_timer[:callback]
end
if @callbacks.length
return @callbacks.shift
end
nil
end
def add_timer(scheduled_time, callback)
priority = -scheduled_time
@timers.push({ scheduled_time: scheduled_time, callback: callback }, priority)
end
def add(&callback)
@callbacks << callback
end
end
module Kernel
def event_loop
@loop ||= EventLoop.new
end
def run_loop(&entrypoint)
event_loop.run(&entrypoint)
end
def set_timeout(timeout, &callback)
event_loop.set_timeout(timeout, &callback)
end
def run_coroutine(fiber, *args, &continuation)
event_loop.run_coroutine(fiber, *args, &continuation)
end
def add_callback(&callback)
event_loop.add_callback(&callback)
end
end
require 'socket'
require './event_loop'
def handle_next_request(server, &handler)
begin
socket = server.accept_nonblock
request = socket.gets.strip
puts "Started handling req: #{request} #{Time.now}"
handler.call(socket)
add_callback { handle_next_request(server, &handler) }
rescue IO::WaitReadable, Errno::EINTR
add_callback { handle_next_request(server, &handler) }
end
end
run_loop do
server = TCPServer.new("127.0.0.1", 5678)
puts "Listening on localhost:5678"
handle_next_request(server) do |socket|
set_timeout(5000) do
puts "Responding 5 seconds later: #{Time.now}"
socket.puts <<~HTTP
HTTP/1.1 200 OK
Content-Type: text/html
Hii 👋
HTTP
socket.close
end
set_timeout(0) { puts "0ms later: #{Time.now}" }
set_timeout(1500) { puts "1.5s later: #{Time.now}" }
end
end
require 'socket'
require 'json'
require './event_loop'
# NB: beyond the concurrency, you won't see much performance benefits here
# Because the copy and stringify operations are CPU-bound
# Replace them with some I/O, like database/files/network, where you have actual "wait time",
# and you'll see more benefits. But I'm tired😭...so maybe another day.
def handle_next_request(server, &handler)
begin
socket = server.accept_nonblock
request = socket.gets.strip
puts "Started handling req: #{request} #{Time.now}"
handler.call(socket)
add_callback { handle_next_request(server, &handler) }
rescue IO::WaitReadable, Errno::EINTR
add_callback { handle_next_request(server, &handler) }
end
end
array = 200_000.times.map { |i| {a: i} }
copy = lambda do |arr|
copied_array = []
arr.each_with_index do |item, index|
if index > 0 && (index % 50_000).zero?
Fiber.yield # Pause every 50_000 items
end
copied_array.push item
end
copied_array
end
stringify = lambda do |arr|
result = "["
arr.each_with_index do |item, index|
if index > 0 && (index % 50_000).zero?
Fiber.yield # Pause every 50_000 items
end
result += item.to_json
result += ","
end
result[result.length - 1] = "]"
result
end
run_loop do
server = TCPServer.new("127.0.0.1", 5678)
puts "Listening on localhost:5678"
handle_next_request(server) do |socket|
run_coroutine(copy, array) do |copied|
puts "Copy done."
run_coroutine(stringify, copied) do
puts "Stringify done."
puts "Responding: #{Time.now}"
socket.puts <<~HTTP
HTTP/1.1 200 OK
Content-Type: text/html
Hii 👋
HTTP
socket.close
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment