Skip to content

Instantly share code, notes, and snippets.

@backpackerhh
Created March 25, 2024 18:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save backpackerhh/c63bcb5e0b006ccbb912435553fd82c7 to your computer and use it in GitHub Desktop.
Save backpackerhh/c63bcb5e0b006ccbb912435553fd82c7 to your computer and use it in GitHub Desktop.
Event bus implemented in Ruby
YourApp.register_provider :domain_events, namespace: true do # dry-system
prepare do
Dir[target.root.join("path/to/**/*_event_subscriber.rb")].each { |file| require file }
register "subscribers", EventSubscriber.subclasses
end
start do
register "bus", InMemoryEventBus.new
register "async_bus", SidekiqEventBus.new
end
end
class Event
attr_reader :aggregate_id, :aggregate_attributes, :occurred_at, :id, :name
private_class_method :new
def self.from_primitives(attributes)
new(id: attributes.fetch(:id, SecureRandom.uuid),
aggregate_id: attributes.fetch(:aggregate_id),
aggregate_attributes: attributes.fetch(:aggregate_attributes),
occurred_at: attributes.fetch(:occurred_at))
end
def initialize(id:, aggregate_id:, aggregate_attributes:, occurred_at:)
@id = id
@aggregate_id = aggregate_id
@aggregate_attributes = aggregate_attributes
@occurred_at = occurred_at
@name = self.class.name
end
def to_primitives
{
id:,
type: name,
occurred_at: occurred_at.strftime("%Y-%m-%d %H:%M:%S.%N %z"),
attributes: {
id: aggregate_id,
**aggregate_attributes
}
}
end
def ==(other)
name == other.name &&
occurred_at == other.occurred_at &&
aggregate_id == other.aggregate_id &&
aggregate_attributes == other.aggregate_attributes
end
end
class EventBus
include Deps[event_subscribers: "domain_events.subscribers"] # dry-auto_inject
attr_reader :event_subscriptions
def initialize(...)
super(...)
@event_subscriptions = Hash.new { |hash, key| hash[key] = [] }
event_subscribers.each do |event_subscriber_klass|
event_subscriber = event_subscriber_klass.new
event_subscriber.subscribed_to.each do |event_klass|
@event_subscriptions[event_klass] << event_subscriber
end
end
end
end
require "json-schema"
class EventSerializer
def self.serialize(event)
uuid_regex_pattern = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
time_regex_pattern = /\A\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{9} \+\d{4}\z/
schema = {
id: "domain-events-serializer",
type: "object",
required: %w[id type occurred_at attributes],
properties: {
id: { type: "string", pattern: uuid_regex_pattern },
type: { type: "string" },
occurred_at: { type: "string", pattern: time_regex_pattern },
attributes: {
type: "object",
required: ["id"],
properties: {
id: { type: "string", pattern: uuid_regex_pattern }
}
}
}
}
validation_errors = JSON::Validator.fully_validate(schema, event.to_primitives)
if validation_errors.any?
raise Domain::InvalidEventSchemaError, validation_errors
end
JSON.parse({ data: { **event.to_primitives } }.to_json)
end
def self.deserialize(raw_event)
event_klass = Object.const_get(raw_event.dig("data", "type"))
event_klass.from_primitives(
aggregate_id: raw_event.dig("data", "attributes", "id"),
aggregate_attributes: raw_event.dig("data", "attributes").except("id").transform_keys(&:to_sym),
occurred_at: Time.parse(raw_event.dig("data", "occurred_at")),
id: raw_event.dig("data", "id")
)
end
end
class EventSubscriber
def on(event)
raise NotImplementedError, "Define what will the event subscriber do upon receiving an event in #on method"
end
def subscribed_to
raise NotImplementedError, "Define the list of events in #subscribed_to method"
end
end
class OrderCreatedEvent < Event
def self.from(order)
from_primitives(
aggregate_id: order.id.value,
aggregate_attributes: {
amount: order.amount.value,
# other attributes...
},
occurred_at: order.created_at.value
)
end
end
class CreateOrderCommissionOnOrderCreatedEventSubscriber < EventSubscriber
def on(event)
CreateOrderCommissionUseCase.new.create(
order_id: event.aggregate_id,
order_amount: event.aggregate_attributes.fetch(:amount)
)
end
def subscribed_to
[OrderCreatedEvent]
end
end
class CreateOrderUseCase
include Deps[event_bus: "domain_events.async_bus"]
def create(attributes)
# omitted...
event_bus.publish(OrderCreatedEvent.from(order))
end
end
class InMemoryEventBus < EventBus
def publish(event)
event_subscriptions[event.class].each { |subscriber| subscriber.on(event) }
end
end
require "sidekiq"
class PublishEventJob
include Sidekiq::Job
sidekiq_options queue: "domain_events", unique: true, retry_for: 3600 # 1 hour
def perform(subscriber_klass_name, raw_event)
event = EventSerializer.deserialize(raw_event)
Object.const_get(subscriber_klass_name).new.on(event)
logger.info("Job enqueued to publish event #{event.id}")
end
end
class SidekiqEventBus < EventBus
def publish(event)
event_subscriptions[event.class].each do |subscriber|
PublishEventJob.perform_async(subscriber.class.name, EventSerializer.serialize(event))
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment