Created
May 2, 2024 11:53
-
-
Save josegomezr/8d4372fd7378a31292cecbb92de62df9 to your computer and use it in GitHub Desktop.
Very much not-production ready Event-bus based off `ActiveSupport::Instrumentation`.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'active_support' | |
class CustomEventBus | |
def initialize() | |
@subscribers = {} | |
end | |
def subscribe(pattern = nil, callback = nil, &block) | |
puts "[#{self.class}] #subscribe TO: #{pattern.inspect}, with callback=#{callback.class.inspect}, block=#{block.class.inspect}" | |
@subscribers[pattern] ||= callback || block | |
end | |
def monotonic_subscribe(pattern = nil, callback = nil, &block) | |
subscribe(pattern, callback, &block) | |
end | |
def subscribed(callback, pattern = nil, monotonic: false, &block) | |
puts "[#{self.class}] #subscribed TO: #{pattern.inspect}, with callback=#{callback.inspect}, block=#{block.inspect}, monotonic=#{monotonic.inspect}" | |
subscriber = subscribe(pattern, callback) | |
yield if block_given? | |
unsubscribe(pattern) | |
end | |
def unsubscribe(subscriber_or_name) | |
puts "[#{self.class}] #unsubscribed: #{subscriber_or_name.inspect}" | |
@subscribers.find do | |
_1 == subscriber_or_name || _2.is_a?(subscriber_or_name) | |
end.tap { @subscribers.delete(_1) if _1 } | |
end | |
def publish(name, *args) | |
puts "[#{self.class}] #publish: name=#{name.inspect} payload=#{args.inspect}" | |
@subscribers[name]&.call(*args) | |
end | |
def publish_event(event) | |
puts "[#{self.class}] #publish_event: name=#{event.name} payload=#{event.payload.inspect}" | |
@subscribers[event.name]&.call(event) | |
end | |
def instrument(name, payload = {}, &block) | |
puts "[#{self.class}] #instrument #{name.inspect} payload=#{payload.inspect} block=#{block.inspect}" | |
start(name, 'id', payload) | |
@event.record { yield payload if block_given? } | |
rescue => e | |
payload[:exception] = [e.class.name, e.message] | |
payload[:exception_object] = e | |
ensure | |
finish(name, 'id', payload) | |
end | |
def start(name, id, payload) | |
puts "[#{self.class}] #start #{name.inspect} id=#{id.inspect} payload=#{payload.inspect}" | |
@event = ActiveSupport::Notifications::Event.new(name, nil, nil, id, payload) | |
end | |
def finish(name, id, payload, listeners = nil) | |
puts "[#{self.class}] #finish #{name.inspect} id=#{id.inspect} payload=#{payload.inspect} listeners=#{listeners.inspect}" | |
@event.payload = payload | |
publish_event(@event) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def main | |
# event_bus = ActiveSupport::Notifications | |
event_bus = CustomEventBus.new | |
subscriber = MyLogSubscriber.new() | |
subscriber.class.attach_to(:sample, subscriber) | |
event_bus.instrument('app_event.sample', {data: :initial}) do |payload| | |
puts 'My code started working' | |
payload[:extra] = :info | |
puts 'My code provides an extra piece of context to consumers' | |
puts 'My code allocates something' | |
event_bus.instrument('checkpoint_1.sample', {checkpoint_first: :data}) | |
alloc = Hash.new; # let's make an allocation just for fun | |
puts 'My starts doing some heavy work here' | |
sleep 0.1 | |
event_bus.instrument('checkpoint_2.sample', {checkpoint_first: :data}) | |
puts 'My starts doing more heavy work here' | |
sleep 0.2 | |
puts 'Finish!' | |
end | |
puts "" | |
puts "=== Now let's simulate an error ===" | |
puts "" | |
event_bus.instrument('app_event.sample', {data: :initial}) do |payload| | |
puts 'My code started working' | |
payload[:extra] = :info | |
puts 'My code provides an extra piece of context to consumers' | |
puts 'My code allocates something' | |
event_bus.instrument('checkpoint_1.sample', {checkpoint_first: :data}) | |
alloc = Hash.new; # let's make an allocation just for fun | |
puts 'My starts doing some heavy work here' | |
sleep 0.1 | |
event_bus.instrument('checkpoint_2.sample', {checkpoint_first: :data}) | |
puts 'My starts doing more heavy work here' | |
sleep 0.2 | |
puts 'Finish!' | |
raise 'Finish with error!' | |
end | |
end | |
puts "=== BEGIN ===" | |
main() rescue nil | |
puts "=== END ===" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyLogSubscriber < ActiveSupport::LogSubscriber | |
def logger | |
@logger ||= Logger.new($stdout).tap do |logger| | |
logger.formatter = proc do |severity, datetime, progname, msg| | |
"#{datetime} [#{severity}]: #{msg}\n" | |
end | |
logger.level = Logger::DEBUG | |
logger | |
end | |
end | |
def app_event(event) | |
info " Example Application Event (#{event.duration.round(2)}ms) - payload=#{event.payload.inspect} allocations=#{event.allocations}" | |
end | |
def checkpoint_1(event) | |
info " Checkpoint the first! (#{event.duration.round(2)}ms) - payload=#{event.payload.inspect}" | |
end | |
def checkpoint_2(event) | |
info " Checkpoint the second! (#{event.duration.round(2)}ms) - payload=#{event.payload.inspect}" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment