Instantly share code, notes, and snippets.

Embed
What would you like to do?
An introduction to event sourcing, London Ruby User Group, 9th May 2018
module Commands
module Meeting
AcceptSchema = Dry::Validation.Schema do
required(:user_id).filled
required(:status).value(eql?: :scheduled)
end
class Accept < Command
def call
return validate if validate.failure?
event = publish_event(
event_type: 'accepted',
entity_type: 'Meeting',
entity_id: @entity.id,
entity_version: @entity.version,
data: { user_id: @params[:user_id] }
)
@entity.handle(event)
Response.new success: true
end
end
end
end
module Commands
class Command
def self.call(entity, params)
new(entity, params).call
end
def initialize(entity, params)
@entity = entity
@params = params
@validator = schema.call(params)
end
def publish_event(params)
Event.create_and_broadcast(params)
end
def validate
@validator.success? ? Response.new(success: true) : Response.new(success: false, errors: @validator.errors)
end
def schema
"#{self.class.name}Schema".constantize
end
end
end
create_table 'events', id: :serial do |t|
t.uuid 'entity_id', null: false, index: true
t.string 'entity_type', null: false
t.string 'event_type', null: false
t.int 'entity_version', null: false
t.jsonb 'meta', null: false
t.jsonb 'data', null: false
t.datetime 'created_at', null: false
t.index ['entity_id', 'entity_version'], name: 'event_version_constraint', unique: true
end
module EventSourced
class Meeting
attr_reader :id, ,:version, :status, :atteendees_count, :time
def initialize(id)
@id = id
@attendees_count = 0
@version = 0
end
def schedule(time:, organizer_id:, invitee_ids)
Commands::Meeting::Schedule.call(self, time: meeting_time, organizer_id: organizer_id, invitee_ids: invitee_ids)
end
def scheduled_handler(event)
params = event.data.symbolize_keys
@time = params[:time]
end
def accept(user_id:)
Commands::Meeting::Accept.call(self, user_id: user_id, status: @status)
end
def accepted_handler(event)
@attendees_count += 1
end
def reschedule(new_time:)
Commands::Meeting::Reschedule.call(self, new_time: new_time, status: @status)
end
def rescheduled_handler(event)
params = event.data.symbolize_keys
@time = params[:new_time]
end
def finish
Commands::Meeting::Finish.call(self, status: @status)
end
def finished_handler(event)
@status = :finished
end
def cancel
Commands::Meeting::Cancel.call(self, status: @status)
end
def cancelled_handler(event)
@status = :cancelled
end
def handle(event)
handler = "#{event.event_type}_handler".to_sym
if entity.respond_to? handler
send(handler, event)
@version += 1
end
end
class << self
def get(id)
entity = new(id)
load_events_for(entity)
entity
end
def schedule(time:, organizer_id:, invitee_ids:)
EventSourced::Meeting.new(SecureRandom.uuid).schedule(
time: time,
organizer_id: organizer_id,
invitee_ids: invitee_ids
)
end
def load_events_for(entity)
events = Event.where(entity_type: 'meeting', entity_id: entity.id)
events.each { |event| entity.handle(event) }
end
end
end
end
class Meeting < ApplicationRecord
has_one :organizer
has_many :invitees
class << self
def scheduled_handler(event)
params = event.data.symbolize_keys
create!(
entity_id: event.entity_id,
time: params.fetch(:time),
organizer_id: params.fetch(:organizer_id),
created_at: event.created_at
)
end
def accepted_handler(event)
# no-op
end
def recheduled_handler(event)
meeting = find(event.entity_id)
params = event.data.symbolize_keys
meeting.update_attribute(time: params.fetch(:new_time))
end
def finished_handler(event)
meeting = find(event.entity_id)
meeting.update_attribute(status: :finished)
end
def cancelled_handler(event)
meeting = find(event.entity_id)
meeting.update_attribute(status: :cancelled)
end
end
end
class MeetingsController < ApplicationController
def new
@meeting_form = MeetingForm.new
end
def create
response = EventSourced::Meeting.schedule(schedule_meeting_params)
if response.success?
@meeting = response.data[:new_meeting]
@meeting.accept(user_id: current_user.id)
redirect_to meeting_path(@meeting)
else
@meeting_form = MeetingForm.new
response.add_errors_to(active_record: @meeting_form)
render 'new'
end
end
def accept
meeting_id = accept_meeting_params[:meeting_id]
response = EventSourced::Meeting
.get(meeting_id)
.accept(user_id: accept_meeting_params[:user_id])
flash[:alert] = "Failed to accept the meeting"if response.failure?
redirect_to meeting_path(meeting_id)
end
private
def schedule_meeting_params
params.require(:meeting_form).permit(:time, :organizer_id, :invitee_ids)
end
def accept_meeting_params
params.require(:accept_form).permit(:meeting_id, :user_id)
end
end
module Commands
module Meeting
ScheduleSchema = Dry::Validation.Schema do
required(:time).value(gt?: Time.now + 5.minutes)
required(:organizer_id).filled
required(:invitee_ids).filled { each(:int?) }
end
class Schedule < Command
def call
return validate if validate.failure?
event = publish_event(
event_type: 'scheduled',
entity_type: 'Meeting',
entity_id: @entity.id,
entity_version: @entity.version,
data: {
time: @params[:time],
organizer_id: @params[:organizer_id],
invitee_ids: @params[:invitee_ids]
}
)
@entity.handle(event)
Response.new success: true, data: { new_meeting: @entity }
end
end
end
end
# LocalPublisher being a Wisper publisher https://github.com/krisleech/wisper
LocalPublisher.subscribe Meeting, on: :meeting_scheduled, with: :scheduled_handler
LocalPublisher.subscribe Meeting, on: :meeting_accepted, with: :accepted_handler
LocalPublisher.subscribe Meeting, on: :meeting_rescheduled, with: :rescheduled_handler
LocalPublisher.subscribe Meeting, on: :meeting_finished, with: :finished_handler
LocalPublisher.subscribe Meeting, on: :meeting_cancelled, with: :cancelled_handler
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment