Skip to content

Instantly share code, notes, and snippets.

@josegomezr
Created May 2, 2024 11:53
Show Gist options
  • Save josegomezr/8d4372fd7378a31292cecbb92de62df9 to your computer and use it in GitHub Desktop.
Save josegomezr/8d4372fd7378a31292cecbb92de62df9 to your computer and use it in GitHub Desktop.
Very much not-production ready Event-bus based off `ActiveSupport::Instrumentation`.
require 'active_support'
class CustomEventBus
def initialize()
@subscribers = {}
end
def subscribe(pattern = nil, callback = nil, &block)
puts "[#{self.class}] #subscribe TO: #{pattern.inspect}, with callback=#{callback.class.inspect}, block=#{block.class.inspect}"
@subscribers[pattern] ||= callback || block
end
def monotonic_subscribe(pattern = nil, callback = nil, &block)
subscribe(pattern, callback, &block)
end
def subscribed(callback, pattern = nil, monotonic: false, &block)
puts "[#{self.class}] #subscribed TO: #{pattern.inspect}, with callback=#{callback.inspect}, block=#{block.inspect}, monotonic=#{monotonic.inspect}"
subscriber = subscribe(pattern, callback)
yield if block_given?
unsubscribe(pattern)
end
def unsubscribe(subscriber_or_name)
puts "[#{self.class}] #unsubscribed: #{subscriber_or_name.inspect}"
@subscribers.find do
_1 == subscriber_or_name || _2.is_a?(subscriber_or_name)
end.tap { @subscribers.delete(_1) if _1 }
end
def publish(name, *args)
puts "[#{self.class}] #publish: name=#{name.inspect} payload=#{args.inspect}"
@subscribers[name]&.call(*args)
end
def publish_event(event)
puts "[#{self.class}] #publish_event: name=#{event.name} payload=#{event.payload.inspect}"
@subscribers[event.name]&.call(event)
end
def instrument(name, payload = {}, &block)
puts "[#{self.class}] #instrument #{name.inspect} payload=#{payload.inspect} block=#{block.inspect}"
start(name, 'id', payload)
@event.record { yield payload if block_given? }
rescue => e
payload[:exception] = [e.class.name, e.message]
payload[:exception_object] = e
ensure
finish(name, 'id', payload)
end
def start(name, id, payload)
puts "[#{self.class}] #start #{name.inspect} id=#{id.inspect} payload=#{payload.inspect}"
@event = ActiveSupport::Notifications::Event.new(name, nil, nil, id, payload)
end
def finish(name, id, payload, listeners = nil)
puts "[#{self.class}] #finish #{name.inspect} id=#{id.inspect} payload=#{payload.inspect} listeners=#{listeners.inspect}"
@event.payload = payload
publish_event(@event)
end
end
def main
# event_bus = ActiveSupport::Notifications
event_bus = CustomEventBus.new
subscriber = MyLogSubscriber.new()
subscriber.class.attach_to(:sample, subscriber)
event_bus.instrument('app_event.sample', {data: :initial}) do |payload|
puts 'My code started working'
payload[:extra] = :info
puts 'My code provides an extra piece of context to consumers'
puts 'My code allocates something'
event_bus.instrument('checkpoint_1.sample', {checkpoint_first: :data})
alloc = Hash.new; # let's make an allocation just for fun
puts 'My starts doing some heavy work here'
sleep 0.1
event_bus.instrument('checkpoint_2.sample', {checkpoint_first: :data})
puts 'My starts doing more heavy work here'
sleep 0.2
puts 'Finish!'
end
puts ""
puts "=== Now let's simulate an error ==="
puts ""
event_bus.instrument('app_event.sample', {data: :initial}) do |payload|
puts 'My code started working'
payload[:extra] = :info
puts 'My code provides an extra piece of context to consumers'
puts 'My code allocates something'
event_bus.instrument('checkpoint_1.sample', {checkpoint_first: :data})
alloc = Hash.new; # let's make an allocation just for fun
puts 'My starts doing some heavy work here'
sleep 0.1
event_bus.instrument('checkpoint_2.sample', {checkpoint_first: :data})
puts 'My starts doing more heavy work here'
sleep 0.2
puts 'Finish!'
raise 'Finish with error!'
end
end
puts "=== BEGIN ==="
main() rescue nil
puts "=== END ==="
class MyLogSubscriber < ActiveSupport::LogSubscriber
def logger
@logger ||= Logger.new($stdout).tap do |logger|
logger.formatter = proc do |severity, datetime, progname, msg|
"#{datetime} [#{severity}]: #{msg}\n"
end
logger.level = Logger::DEBUG
logger
end
end
def app_event(event)
info " Example Application Event (#{event.duration.round(2)}ms) - payload=#{event.payload.inspect} allocations=#{event.allocations}"
end
def checkpoint_1(event)
info " Checkpoint the first! (#{event.duration.round(2)}ms) - payload=#{event.payload.inspect}"
end
def checkpoint_2(event)
info " Checkpoint the second! (#{event.duration.round(2)}ms) - payload=#{event.payload.inspect}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment