Skip to content

Instantly share code, notes, and snippets.

@pcreux
Last active March 22, 2021 18:36
Show Gist options
  • Star 84 You must be signed in to star a gist
  • Fork 18 You must be signed in to fork a gist
  • Save pcreux/d094affd957a336af4f59b85f6ec0e6d to your computer and use it in GitHub Desktop.
Save pcreux/d094affd957a336af4f59b85f6ec0e6d to your computer and use it in GitHub Desktop.
# frozen_string_literal: true
module Drip
# This is the BaseEvent class that all Events inherit from.
# It takes care of serializing `data` and `metadata` via json
# It defines setters and accessors for the defined `data_attributes`
# After create, it calls `apply` to apply changes.
#
# Subclasses must define the `apply` method.
class Events::BaseEvent < ActiveRecord::Base
serialize :data, JSON
serialize :metadata, JSON
before_create :apply_and_persist
after_create :dispatch
self.abstract_class = true
# Not using `created_at` as MySQL timestamps don't include ms.
scope :recent_first, -> { reorder('id DESC')}
# Apply the event to the aggregate passed in.
# Must return the aggregate.
def apply(aggregate)
raise NotImplementedError
end
after_initialize do
self.data ||= {}
self.metadata ||= {}
end
# Define attributes to be serialize in the `data` column.
# It generates setters and getters for those.
#
# Example:
#
# class MyEvent < Events::BaseEvent
# belongs_to :post
#
# data_attributes :title, :description
# end
#
# MyEvent.create!(post: post, title: "Hello", description: "", metadata: {})
def self.data_attributes(*attrs)
@data_attributes ||= []
attrs.map(&:to_s).each do |attr|
@data_attributes << attr unless @data_attributes.include?(attr)
define_method attr do
self.data ||= {}
self.data[attr]
end
define_method "#{attr}=" do |arg|
self.data ||= {}
self.data[attr] = arg
end
end
@data_attributes
end
def aggregate=(model)
public_send "#{aggregate_name}=", model
end
# Return the aggregate that the event will apply to
def aggregate
public_send aggregate_name
end
def aggregate_id=(id)
public_send "#{aggregate_name}_id=", id
end
def aggregate_id
public_send "#{aggregate_name}_id"
end
def build_aggregate
public_send "build_#{aggregate_name}"
end
# Apply the transformation to the aggregate and save it.
private def apply_and_persist
# Build aggregate when the event is creating an aggregate
self.aggregate ||= build_aggregate
# Lock! (all good, we're in the ActiveRecord callback chain transaction)
aggregate.lock! if aggregate.persisted?
# Apply!
self.aggregate = apply(aggregate)
# Persist!
aggregate.save!
self.aggregate_id = aggregate.id if aggregate_id.nil?
end
def self.aggregate_name
inferred_aggregate = reflect_on_all_associations(:belongs_to).first
raise "Events must belong to an aggregate" if inferred_aggregate.nil?
inferred_aggregate.name
end
delegate :aggregate_name, to: :class
# Underscored class name by default. ex: "drop_post/updated"
# Used when sending events to the data pipeline
def self.event_name
self.name.sub("Drip::Events::", '').underscore
end
private def dispatch
Events::Dispatcher.dispatch(self)
end
end
end
# frozen_string_literal: true
module Drip
# The Base command mixin that commands include.
#
# A Command has the following public api.
#
# ```
# MyCommand.call(user: ..., post: ...) # shorthand to initialize, validate and execute the command
# command = MyCommand.new(user: ..., post: ...)
# command.valid? # true or false
# command.errors # +> <ActiveModel::Errors ... >
# command.call # validate and execute the command
# ```
#
# `call` will raise an `ActiveRecord::RecordInvalid` error if it fails validations.
#
# Commands including the `Command::Base` mixin must:
# * list the attributes the command takes
# * implement `build_event` which returns a non-persisted event or nil for noop.
#
# Ex:
#
# ```
# class MyCommand
# include Command
#
# attributes :user, :post
#
# def build_event
# Event.new(...)
# end
# end
# ```
module Commands
module Command
extend ActiveSupport::Concern
included do
include ActiveModel::Validations
end
class_methods do
# Run validations and persist the event.
#
# On success: returns the event
# On noop: returns nil
# On failure: raise an ActiveRecord::RecordInvalid error
def call(*args)
new(*args).call
end
# Define the attributes.
# They are set when initializing the command as keyword arguments and
# are all accessible as getter methods.
#
# ex: `attributes :post, :user, :ability`
def attributes(*args)
attr_reader(*args)
initialize_method_arguments = args.map { |arg| "#{arg}:" }.join(', ')
initialize_method_body = args.map { |arg| "@#{arg} = #{arg}" }.join(";")
class_eval <<~CODE
def initialize(#{initialize_method_arguments})
#{initialize_method_body}
after_initialize
end
CODE
end
end
def call
return nil if event.nil?
raise "The event should not be persisted at this stage!" if event.persisted?
validate!
execute!
event
end
def validate!
valid? || raise(ActiveRecord::RecordInvalid, self)
end
# A new record or nil if noop
def event
@event ||= build_event
end
# Save the event. Should not be overwritten by the command as side effects
# should be implemented via Reactors triggering other Events.
private def execute!
event.save!
end
# Returns a new event record or nil if noop
private def build_event
raise NotImplementedError
end
# Hook to set default values
private def after_initialize
# noop
end
end
end
end
# frozen_string_literal: true
module Drip
# Dispatcher implementation used by Drip::Events::Dispatcher.
class EventDispatcher
# Register Reactors to Events.
# * Reactors registered with `trigger` will be triggered synchronously
# * Reactors registered with `async` will be triggered asynchronously via a Sidekiq Job
#
# Example:
#
# on BaseEvent, trigger: LogEvent, async: TrackEvent
# on PledgeCancelled, PaymentFailed, async: [NotifyAdmin, CreateTask]
# on [PledgeCancelled, PaymentFailed], async: [NotifyAdmin, CreateTask]
#
def self.on(*events, trigger: [], async: [])
rules.register(events: events.flatten, sync: Array(trigger), async: Array(async))
end
# Dispatches events to matching Reactors once.
# Called by all events after they are created.
def self.dispatch(event)
reactors = rules.for(event)
reactors.sync.each { |reactor| reactor.call(event) }
reactors.async.each { |reactor| ReactorJob.create(event.class.name, event.id, reactor.to_s) }
end
def self.rules
@rules ||= RuleSet.new
end
class RuleSet
def initialize
@rules ||= Hash.new { |h, k| h[k] = ReactorSet.new }
end
# Register events with their sync and async Reactors
def register(events:, sync:, async:)
events.each do |event|
@rules[event].add_sync sync
@rules[event].add_async async
end
end
# Return a ReactorSet containing all Reactors matching an Event
def for(event)
reactors = ReactorSet.new
@rules.each do |event_class, rule|
# Match event by class including ancestors. e.g. All events match a role for BaseEvent.
if event.is_a?(event_class)
reactors.add_sync rule.sync
reactors.add_async rule.async
end
end
reactors
end
end
# Contains sync and async reactors. Used to:
# * store reactors via Rules#register
# * return a set of matching reactors with Rules#for
class ReactorSet
attr_reader :sync, :async
def initialize
@sync = Set.new
@async = Set.new
end
def add_sync(reactors)
@sync += reactors
end
def add_async(reactors)
@async += reactors
end
end
end
end
@mhoad
Copy link

mhoad commented May 14, 2018

I might really be pushing my luck here but is there any chance at all of getting a functional demo Rails app of even just creating / updating a single resource. I haven't had any luck as a more junior developer getting this to work in any kind of fashion after a couple of hours. It would genuinely mean a lot to me if at all possible! I really want to learn this better but have hit a huge roadblock.

@f-mer
Copy link

f-mer commented May 15, 2018

Just in case someone is looking for the related blog article: https://kickstarter.engineering/event-sourcing-made-simple-4a2625113224 😉

@brogier
Copy link

brogier commented May 20, 2018

Thanks for putting this online! I've been playing with ES for some time and really like the idea. I do have 2 questions though.

  1. At what point do you persist the data on the read model? Is it in the apply method or not even in the event at all?
  2. How do you handle events for creating entities eg orders when the ID of the order is not yet available?

@arathunku
Copy link

I've taken a stab at implementing this in Elixir if anyone's interested - https://github.com/arathunku/ecex. I've taken some shortcuts but it should do as a proof of concept.

@rogierverbrugge

Ad1.
aggregate is saved here: https://gist.github.com/pcreux/d094affd957a336af4f59b85f6ec0e6d#file-base_event-rb-L98 and event in https://gist.github.com/pcreux/d094affd957a336af4f59b85f6ec0e6d#file-command-rb-L95

Ad2. One of the ways to solve this is to generate UUID for the aggregate in the command. That's one of the ways to solve this, not sure how they do it at Drip :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment