Skip to content

Instantly share code, notes, and snippets.

@palkan
Last active August 29, 2019 13:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save palkan/644375279a037f1809d28a58b1abe390 to your computer and use it in GitHub Desktop.
Save palkan/644375279a037f1809d28a58b1abe390 to your computer and use it in GitHub Desktop.
Railsy Rails Events Store

Railsy Rails Events Store

Since our architecture is modularized, we need a way for our components/engines to communicate with each other.

We do this by adding a pub/sub (or event sourcing) layer to our application via Rails Event Store.

We do not use RES directly but through the railsy-events engine, which wraps RES functionality and provide its own API. That would allow us to replace RES in the future (if necessary) without changing our application code.

Describe events

Events are represented by event classes, which describe events payloads and identifiers:

class ProfileCompleted < Railsy::Events::Event
  # (optional) event identifier is used for transmitting events
  # to subscribers.
  #
  # By default, identifier is equal to `name.underscore.gsub('/', '.')`.
  #
  # You don't need to specify identifier manually, only for backward compatibility when
  # class name is changed.
  self.identifier = "profile_completed"

  # Add attributes accessors
  attributes :user_id

  # Sync attributes only available for sync subscribers
  # (so you can add some optional non-JSON serializable data here)
  # For example, we can also add `user` record to the event to avoid
  # reloading in sync subscribers
  sync_attributes :user
end

NOTE: we use JSON to serialize events, thus only the simple field types (numbers, strings, booleans) are supported.

Each event has predefined (reserved) fields:

  • event_id – unique event id
  • type – event type (=identifier)
  • metadata

NOTE: events should be in the past tense and describe what happened (e.g. "ProfileCreated", "EventPublished", etc.).

Events are stored in app/events folder.

Events registration

Since we use abstract identifiers instead of class names, we need a way to tell our mapper how to infer an event class from its type.

We register events automatically when they're published or when a subscription is created.

You can also register events manually:

# by passing an event class
Railsy::Events.mapper.register_event MyEventClass

# or more precisely (in that case `event.type` must be equal "my_event")
Railsy::Events.mapper.register "my_event", MyEventClass

Publish events

To publish an event you must first create an instane of the event class and call Railsy::Events.publish method:

event = ProfileCompleted.new(user_id: user.id)

# or with metadata
event = ProfileCompleted.new(user_id: user.id, metadata: { ip: request.remote_ip })

# then publish the event
Railsy::Events.publish(event)

That's it! Your event has been stored and propagated.

Subscribe to events

To subscribe a handler to an event you must use Railsy::Events.subscribe method.

You shoud do this in your app or engine initializer:

# some/engine.rb

initializer "my_engine.subscribe_to_events" do
  # To make sure event store is initialized use load hook
  # `store` == `Common::Events`
  ActiveSupport.on_load "railsy-events" do |store|
    # async subscriber – invoked from background job, enqueued after the current transaction commits
    # NOTE: all subsribers are asynchrounous by default
    store.subscribe MyEventHandler, to: ProfileCreated

    # sync subscriber – invoked right "within" `publish` method
    store.subscribe MyEventHandler, to: ProfileCreated, sync: true

    # anonymous handler (could only be synchronous)
    store.subscribe(to: ConnectBy::ProfileCreated, sync: true) do |event|
      # do something
    end

    # you can omit event if your subscriber follows the convention
    # for example, the following subscriber would subscribe to
    # ProfileCreated event
    store.subscribe OnProfileCreated::DoThat
  end
end

NOTE: event handler must be a callable object.

Although subscriber could be any callable Ruby object, that have specific input format (event); thus we suggest putting subscribers under app/subscribers/on_<event_type>/<subscriber.rb>, e.g. app/subscribers/on_profile_created/create_chat_user.rb).

Testing

You can test subscribers as normal Ruby objects.

To test that a given subscriber exists, you can do the following:

# for asynchrounous subscriptions
it "is subscribed to some event" do
  event = MyEvent.new(some: "data")
  expect { Railsy::Events.publish event }.
    to have_enqueued_async_subscriber_for(MySubscriberService).
    with(event)
end

# for synchrounous subscriptions
it "is subscribed to some event" do
  allow(MySubscriberService).to receive(:call)

  event = MyEvent.new(some: "data")

  Railsy::Events.publish event

  expect(MySubscriberService).to have_received(:call).with(event)
end

To test publishing use have_published_event matcher:

expect { subject }.to have_published_event(ProfileCreated).with(user_id: user.id)

NOTE: have_published_event only supports block expectations.

NOTE 2 with modifier works like have_attributes matcher (not contain_exactly); you can only specify serializable attributes in with (i.e. sync attributes are not supported, 'cause they do not persist).

# frozen_string_literal: true
require "json"
module Railsy
module Events
using(Module.new do
refine Hash do
def symbolize_keys
RubyEventStore::TransformKeys.symbolize(self)
end
end
end)
# Custom mapper for RES events.
#
# See https://github.com/RailsEventStore/rails_event_store/blob/v0.35.0/ruby_event_store/lib/ruby_event_store/mappers/default.rb
class Mapper
def initialize(serializer: JSON, mapping: {})
@serializer = serializer
@mapping = mapping
end
def register(type, class_name)
mapping[type] = class_name
end
def register_event(event_class)
register event_class.identifier, event_class.name
end
def event_to_serialized_record(domain_event)
# lazily add type to mapping
# NOTE: use class name instead of a class to handle code reload
# in development (to avoid accessing orphaned classes)
mapping[domain_event.type] ||= domain_event.class.name
RubyEventStore::SerializedRecord.new(
event_id: domain_event.event_id,
metadata: serializer.dump(domain_event.metadata.to_h),
data: serializer.dump(domain_event.data),
event_type: domain_event.type
)
end
def serialized_record_to_event(record)
event_class = mapping.fetch(record.event_type) do
raise "Don't know how to deserialize event: \"#{record.event_type}\". " \
"Add explicit mapping: Railsy::Events.mapper.register \"#{record.event_type}\", \"<Class Name>\""
end
Object.const_get(event_class).new(
**serializer.load(record.data).symbolize_keys,
metadata: serializer.load(record.metadata).symbolize_keys,
event_id: record.event_id
)
end
private
attr_reader :serializer, :mapping
end
end
end
# frozen_string_literal: true
# a ton of "require"-s
module Railsy
module Events
class << self
# Underlying RailsEventStore
attr_accessor :event_store
def mapper
# We need access to the global mapper to register types mappings
event_store.send(:mapper)
end
def config
@config ||= Config.new
end
def subscribe(subscriber = nil, to: nil, sync: false)
subscriber ||= Proc.new
to ||= infer_event_from_subscriber(subscriber) if subscriber.is_a?(Module)
if to.nil?
raise ArgumentError, "Couldn't infer event from subscriber. " \
"Please, specify event using `to:` option"
end
identifier =
if to.is_a?(Class) && Railsy::Events::Event >= to
# register event
mapper.register_event to
to.identifier
else
to
end
subscriber = SubscriberJob.from(subscriber) if sync == false
event_store.subscribe subscriber, to: [identifier]
end
def publish(event, **options)
event_store.publish event, **options
end
private
def infer_event_from_subscriber(subscriber)
event_class_name = subscriber.name.split("::").yield_self do |parts|
# handle explicit top-level name, e.g. ::Some::Event
parts.shift if parts.first.empty?
# drop last part – it's a unique subscriber name
parts.pop
parts.last.sub!(/^On/, "")
parts.join("::")
end
event_class_name.safe_constantize
end
end
end
end
@palkan
Copy link
Author

palkan commented Aug 29, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment