-
-
Save pcreux/d094affd957a336af4f59b85f6ec0e6d to your computer and use it in GitHub Desktop.
# frozen_string_literal: true | |
module Drip | |
# This is the BaseEvent class that all Events inherit from. | |
# It takes care of serializing `data` and `metadata` via json | |
# It defines setters and accessors for the defined `data_attributes` | |
# After create, it calls `apply` to apply changes. | |
# | |
# Subclasses must define the `apply` method. | |
class Events::BaseEvent < ActiveRecord::Base | |
serialize :data, JSON | |
serialize :metadata, JSON | |
before_create :apply_and_persist | |
after_create :dispatch | |
self.abstract_class = true | |
# Not using `created_at` as MySQL timestamps don't include ms. | |
scope :recent_first, -> { reorder('id DESC')} | |
# Apply the event to the aggregate passed in. | |
# Must return the aggregate. | |
def apply(aggregate) | |
raise NotImplementedError | |
end | |
after_initialize do | |
self.data ||= {} | |
self.metadata ||= {} | |
end | |
# Define attributes to be serialize in the `data` column. | |
# It generates setters and getters for those. | |
# | |
# Example: | |
# | |
# class MyEvent < Events::BaseEvent | |
# belongs_to :post | |
# | |
# data_attributes :title, :description | |
# end | |
# | |
# MyEvent.create!(post: post, title: "Hello", description: "", metadata: {}) | |
def self.data_attributes(*attrs) | |
@data_attributes ||= [] | |
attrs.map(&:to_s).each do |attr| | |
@data_attributes << attr unless @data_attributes.include?(attr) | |
define_method attr do | |
self.data ||= {} | |
self.data[attr] | |
end | |
define_method "#{attr}=" do |arg| | |
self.data ||= {} | |
self.data[attr] = arg | |
end | |
end | |
@data_attributes | |
end | |
def aggregate=(model) | |
public_send "#{aggregate_name}=", model | |
end | |
# Return the aggregate that the event will apply to | |
def aggregate | |
public_send aggregate_name | |
end | |
def aggregate_id=(id) | |
public_send "#{aggregate_name}_id=", id | |
end | |
def aggregate_id | |
public_send "#{aggregate_name}_id" | |
end | |
def build_aggregate | |
public_send "build_#{aggregate_name}" | |
end | |
# Apply the transformation to the aggregate and save it. | |
private def apply_and_persist | |
# Build aggregate when the event is creating an aggregate | |
self.aggregate ||= build_aggregate | |
# Lock! (all good, we're in the ActiveRecord callback chain transaction) | |
aggregate.lock! if aggregate.persisted? | |
# Apply! | |
self.aggregate = apply(aggregate) | |
# Persist! | |
aggregate.save! | |
self.aggregate_id = aggregate.id if aggregate_id.nil? | |
end | |
def self.aggregate_name | |
inferred_aggregate = reflect_on_all_associations(:belongs_to).first | |
raise "Events must belong to an aggregate" if inferred_aggregate.nil? | |
inferred_aggregate.name | |
end | |
delegate :aggregate_name, to: :class | |
# Underscored class name by default. ex: "drop_post/updated" | |
# Used when sending events to the data pipeline | |
def self.event_name | |
self.name.sub("Drip::Events::", '').underscore | |
end | |
private def dispatch | |
Events::Dispatcher.dispatch(self) | |
end | |
end | |
end |
# frozen_string_literal: true | |
module Drip | |
# The Base command mixin that commands include. | |
# | |
# A Command has the following public api. | |
# | |
# ``` | |
# MyCommand.call(user: ..., post: ...) # shorthand to initialize, validate and execute the command | |
# command = MyCommand.new(user: ..., post: ...) | |
# command.valid? # true or false | |
# command.errors # +> <ActiveModel::Errors ... > | |
# command.call # validate and execute the command | |
# ``` | |
# | |
# `call` will raise an `ActiveRecord::RecordInvalid` error if it fails validations. | |
# | |
# Commands including the `Command::Base` mixin must: | |
# * list the attributes the command takes | |
# * implement `build_event` which returns a non-persisted event or nil for noop. | |
# | |
# Ex: | |
# | |
# ``` | |
# class MyCommand | |
# include Command | |
# | |
# attributes :user, :post | |
# | |
# def build_event | |
# Event.new(...) | |
# end | |
# end | |
# ``` | |
module Commands | |
module Command | |
extend ActiveSupport::Concern | |
included do | |
include ActiveModel::Validations | |
end | |
class_methods do | |
# Run validations and persist the event. | |
# | |
# On success: returns the event | |
# On noop: returns nil | |
# On failure: raise an ActiveRecord::RecordInvalid error | |
def call(*args) | |
new(*args).call | |
end | |
# Define the attributes. | |
# They are set when initializing the command as keyword arguments and | |
# are all accessible as getter methods. | |
# | |
# ex: `attributes :post, :user, :ability` | |
def attributes(*args) | |
attr_reader(*args) | |
initialize_method_arguments = args.map { |arg| "#{arg}:" }.join(', ') | |
initialize_method_body = args.map { |arg| "@#{arg} = #{arg}" }.join(";") | |
class_eval <<~CODE | |
def initialize(#{initialize_method_arguments}) | |
#{initialize_method_body} | |
after_initialize | |
end | |
CODE | |
end | |
end | |
def call | |
return nil if event.nil? | |
raise "The event should not be persisted at this stage!" if event.persisted? | |
validate! | |
execute! | |
event | |
end | |
def validate! | |
valid? || raise(ActiveRecord::RecordInvalid, self) | |
end | |
# A new record or nil if noop | |
def event | |
@event ||= build_event | |
end | |
# Save the event. Should not be overwritten by the command as side effects | |
# should be implemented via Reactors triggering other Events. | |
private def execute! | |
event.save! | |
end | |
# Returns a new event record or nil if noop | |
private def build_event | |
raise NotImplementedError | |
end | |
# Hook to set default values | |
private def after_initialize | |
# noop | |
end | |
end | |
end | |
end |
# frozen_string_literal: true | |
module Drip | |
# Dispatcher implementation used by Drip::Events::Dispatcher. | |
class EventDispatcher | |
# Register Reactors to Events. | |
# * Reactors registered with `trigger` will be triggered synchronously | |
# * Reactors registered with `async` will be triggered asynchronously via a Sidekiq Job | |
# | |
# Example: | |
# | |
# on BaseEvent, trigger: LogEvent, async: TrackEvent | |
# on PledgeCancelled, PaymentFailed, async: [NotifyAdmin, CreateTask] | |
# on [PledgeCancelled, PaymentFailed], async: [NotifyAdmin, CreateTask] | |
# | |
def self.on(*events, trigger: [], async: []) | |
rules.register(events: events.flatten, sync: Array(trigger), async: Array(async)) | |
end | |
# Dispatches events to matching Reactors once. | |
# Called by all events after they are created. | |
def self.dispatch(event) | |
reactors = rules.for(event) | |
reactors.sync.each { |reactor| reactor.call(event) } | |
reactors.async.each { |reactor| ReactorJob.create(event.class.name, event.id, reactor.to_s) } | |
end | |
def self.rules | |
@rules ||= RuleSet.new | |
end | |
class RuleSet | |
def initialize | |
@rules ||= Hash.new { |h, k| h[k] = ReactorSet.new } | |
end | |
# Register events with their sync and async Reactors | |
def register(events:, sync:, async:) | |
events.each do |event| | |
@rules[event].add_sync sync | |
@rules[event].add_async async | |
end | |
end | |
# Return a ReactorSet containing all Reactors matching an Event | |
def for(event) | |
reactors = ReactorSet.new | |
@rules.each do |event_class, rule| | |
# Match event by class including ancestors. e.g. All events match a role for BaseEvent. | |
if event.is_a?(event_class) | |
reactors.add_sync rule.sync | |
reactors.add_async rule.async | |
end | |
end | |
reactors | |
end | |
end | |
# Contains sync and async reactors. Used to: | |
# * store reactors via Rules#register | |
# * return a set of matching reactors with Rules#for | |
class ReactorSet | |
attr_reader :sync, :async | |
def initialize | |
@sync = Set.new | |
@async = Set.new | |
end | |
def add_sync(reactors) | |
@sync += reactors | |
end | |
def add_async(reactors) | |
@async += reactors | |
end | |
end | |
end | |
end |
Just in case someone is looking for the related blog article: https://kickstarter.engineering/event-sourcing-made-simple-4a2625113224 😉
Thanks for putting this online! I've been playing with ES for some time and really like the idea. I do have 2 questions though.
- At what point do you persist the data on the read model? Is it in the apply method or not even in the event at all?
- How do you handle events for creating entities eg orders when the ID of the order is not yet available?
I've taken a stab at implementing this in Elixir if anyone's interested - https://github.com/arathunku/ecex. I've taken some shortcuts but it should do as a proof of concept.
@rogierverbrugge
Ad1.
aggregate is saved here: https://gist.github.com/pcreux/d094affd957a336af4f59b85f6ec0e6d#file-base_event-rb-L98 and event in https://gist.github.com/pcreux/d094affd957a336af4f59b85f6ec0e6d#file-command-rb-L95
Ad2. One of the ways to solve this is to generate UUID for the aggregate in the command. That's one of the ways to solve this, not sure how they do it at Drip :)
I might really be pushing my luck here but is there any chance at all of getting a functional demo Rails app of even just creating / updating a single resource. I haven't had any luck as a more junior developer getting this to work in any kind of fashion after a couple of hours. It would genuinely mean a lot to me if at all possible! I really want to learn this better but have hit a huge roadblock.