Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
An introduction to event sourcing, London Ruby User Group, 9th May 2018
module Commands
module Meeting
AcceptSchema = Dry::Validation.Schema do
required(:user_id).filled
required(:status).value(eql?: :scheduled)
end
class Accept < Command
def call
return validate if validate.failure?
event = publish_event(
event_type: 'accepted',
entity_type: 'Meeting',
entity_id: @entity.id,
entity_version: @entity.version,
data: { user_id: @params[:user_id] }
)
@entity.handle(event)
Response.new success: true
end
end
end
end
module Commands
class Command
def self.call(entity, params)
new(entity, params).call
end
def initialize(entity, params)
@entity = entity
@params = params
@validator = schema.call(params)
end
def publish_event(params)
Event.create_and_broadcast(params)
end
def validate
@validator.success? ? Response.new(success: true) : Response.new(success: false, errors: @validator.errors)
end
def schema
"#{self.class.name}Schema".constantize
end
end
end
create_table 'events', id: :serial do |t|
t.uuid 'entity_id', null: false, index: true
t.string 'entity_type', null: false
t.string 'event_type', null: false
t.int 'entity_version', null: false
t.jsonb 'meta', null: false
t.jsonb 'data', null: false
t.datetime 'created_at', null: false
t.index ['entity_id', 'entity_version'], name: 'event_version_constraint', unique: true
end
module EventSourced
class Meeting
attr_reader :id, ,:version, :status, :atteendees_count, :time
def initialize(id)
@id = id
@attendees_count = 0
@version = 0
end
def schedule(time:, organizer_id:, invitee_ids)
Commands::Meeting::Schedule.call(self, time: meeting_time, organizer_id: organizer_id, invitee_ids: invitee_ids)
end
def scheduled_handler(event)
params = event.data.symbolize_keys
@time = params[:time]
end
def accept(user_id:)
Commands::Meeting::Accept.call(self, user_id: user_id, status: @status)
end
def accepted_handler(event)
@attendees_count += 1
end
def reschedule(new_time:)
Commands::Meeting::Reschedule.call(self, new_time: new_time, status: @status)
end
def rescheduled_handler(event)
params = event.data.symbolize_keys
@time = params[:new_time]
end
def finish
Commands::Meeting::Finish.call(self, status: @status)
end
def finished_handler(event)
@status = :finished
end
def cancel
Commands::Meeting::Cancel.call(self, status: @status)
end
def cancelled_handler(event)
@status = :cancelled
end
def handle(event)
handler = "#{event.event_type}_handler".to_sym
if entity.respond_to? handler
send(handler, event)
@version += 1
end
end
class << self
def get(id)
entity = new(id)
load_events_for(entity)
entity
end
def schedule(time:, organizer_id:, invitee_ids:)
EventSourced::Meeting.new(SecureRandom.uuid).schedule(
time: time,
organizer_id: organizer_id,
invitee_ids: invitee_ids
)
end
def load_events_for(entity)
events = Event.where(entity_type: 'meeting', entity_id: entity.id)
events.each { |event| entity.handle(event) }
end
end
end
end
class Meeting < ApplicationRecord
has_one :organizer
has_many :invitees
class << self
def scheduled_handler(event)
params = event.data.symbolize_keys
create!(
entity_id: event.entity_id,
time: params.fetch(:time),
organizer_id: params.fetch(:organizer_id),
created_at: event.created_at
)
end
def accepted_handler(event)
# no-op
end
def recheduled_handler(event)
meeting = find(event.entity_id)
params = event.data.symbolize_keys
meeting.update_attribute(time: params.fetch(:new_time))
end
def finished_handler(event)
meeting = find(event.entity_id)
meeting.update_attribute(status: :finished)
end
def cancelled_handler(event)
meeting = find(event.entity_id)
meeting.update_attribute(status: :cancelled)
end
end
end
class MeetingsController < ApplicationController
def new
@meeting_form = MeetingForm.new
end
def create
response = EventSourced::Meeting.schedule(schedule_meeting_params)
if response.success?
@meeting = response.data[:new_meeting]
@meeting.accept(user_id: current_user.id)
redirect_to meeting_path(@meeting)
else
@meeting_form = MeetingForm.new
response.add_errors_to(active_record: @meeting_form)
render 'new'
end
end
def accept
meeting_id = accept_meeting_params[:meeting_id]
response = EventSourced::Meeting
.get(meeting_id)
.accept(user_id: accept_meeting_params[:user_id])
flash[:alert] = "Failed to accept the meeting"if response.failure?
redirect_to meeting_path(meeting_id)
end
private
def schedule_meeting_params
params.require(:meeting_form).permit(:time, :organizer_id, :invitee_ids)
end
def accept_meeting_params
params.require(:accept_form).permit(:meeting_id, :user_id)
end
end
module Commands
module Meeting
ScheduleSchema = Dry::Validation.Schema do
required(:time).value(gt?: Time.now + 5.minutes)
required(:organizer_id).filled
required(:invitee_ids).filled { each(:int?) }
end
class Schedule < Command
def call
return validate if validate.failure?
event = publish_event(
event_type: 'scheduled',
entity_type: 'Meeting',
entity_id: @entity.id,
entity_version: @entity.version,
data: {
time: @params[:time],
organizer_id: @params[:organizer_id],
invitee_ids: @params[:invitee_ids]
}
)
@entity.handle(event)
Response.new success: true, data: { new_meeting: @entity }
end
end
end
end
# LocalPublisher being a Wisper publisher https://github.com/krisleech/wisper
LocalPublisher.subscribe Meeting, on: :meeting_scheduled, with: :scheduled_handler
LocalPublisher.subscribe Meeting, on: :meeting_accepted, with: :accepted_handler
LocalPublisher.subscribe Meeting, on: :meeting_rescheduled, with: :rescheduled_handler
LocalPublisher.subscribe Meeting, on: :meeting_finished, with: :finished_handler
LocalPublisher.subscribe Meeting, on: :meeting_cancelled, with: :cancelled_handler
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.