An introduction to event sourcing, London Ruby User Group, 9th May 2018
module Commands | |
module Meeting | |
AcceptSchema = Dry::Validation.Schema do | |
required(:user_id).filled | |
required(:status).value(eql?: :scheduled) | |
end | |
class Accept < Command | |
def call | |
return validate if validate.failure? | |
event = publish_event( | |
event_type: 'accepted', | |
entity_type: 'Meeting', | |
entity_id: @entity.id, | |
entity_version: @entity.version, | |
data: { user_id: @params[:user_id] } | |
) | |
@entity.handle(event) | |
Response.new success: true | |
end | |
end | |
end | |
end | |
module Commands | |
class Command | |
def self.call(entity, params) | |
new(entity, params).call | |
end | |
def initialize(entity, params) | |
@entity = entity | |
@params = params | |
@validator = schema.call(params) | |
end | |
def publish_event(params) | |
Event.create_and_broadcast(params) | |
end | |
def validate | |
@validator.success? ? Response.new(success: true) : Response.new(success: false, errors: @validator.errors) | |
end | |
def schema | |
"#{self.class.name}Schema".constantize | |
end | |
end | |
end | |
create_table 'events', id: :serial do |t| | |
t.uuid 'entity_id', null: false, index: true | |
t.string 'entity_type', null: false | |
t.string 'event_type', null: false | |
t.int 'entity_version', null: false | |
t.jsonb 'meta', null: false | |
t.jsonb 'data', null: false | |
t.datetime 'created_at', null: false | |
t.index ['entity_id', 'entity_version'], name: 'event_version_constraint', unique: true | |
end | |
module EventSourced | |
class Meeting | |
attr_reader :id, ,:version, :status, :atteendees_count, :time | |
def initialize(id) | |
@id = id | |
@attendees_count = 0 | |
@version = 0 | |
end | |
def schedule(time:, organizer_id:, invitee_ids) | |
Commands::Meeting::Schedule.call(self, time: meeting_time, organizer_id: organizer_id, invitee_ids: invitee_ids) | |
end | |
def scheduled_handler(event) | |
params = event.data.symbolize_keys | |
@time = params[:time] | |
end | |
def accept(user_id:) | |
Commands::Meeting::Accept.call(self, user_id: user_id, status: @status) | |
end | |
def accepted_handler(event) | |
@attendees_count += 1 | |
end | |
def reschedule(new_time:) | |
Commands::Meeting::Reschedule.call(self, new_time: new_time, status: @status) | |
end | |
def rescheduled_handler(event) | |
params = event.data.symbolize_keys | |
@time = params[:new_time] | |
end | |
def finish | |
Commands::Meeting::Finish.call(self, status: @status) | |
end | |
def finished_handler(event) | |
@status = :finished | |
end | |
def cancel | |
Commands::Meeting::Cancel.call(self, status: @status) | |
end | |
def cancelled_handler(event) | |
@status = :cancelled | |
end | |
def handle(event) | |
handler = "#{event.event_type}_handler".to_sym | |
if entity.respond_to? handler | |
send(handler, event) | |
@version += 1 | |
end | |
end | |
class << self | |
def get(id) | |
entity = new(id) | |
load_events_for(entity) | |
entity | |
end | |
def schedule(time:, organizer_id:, invitee_ids:) | |
EventSourced::Meeting.new(SecureRandom.uuid).schedule( | |
time: time, | |
organizer_id: organizer_id, | |
invitee_ids: invitee_ids | |
) | |
end | |
def load_events_for(entity) | |
events = Event.where(entity_type: 'meeting', entity_id: entity.id) | |
events.each { |event| entity.handle(event) } | |
end | |
end | |
end | |
end | |
class Meeting < ApplicationRecord | |
has_one :organizer | |
has_many :invitees | |
class << self | |
def scheduled_handler(event) | |
params = event.data.symbolize_keys | |
create!( | |
entity_id: event.entity_id, | |
time: params.fetch(:time), | |
organizer_id: params.fetch(:organizer_id), | |
created_at: event.created_at | |
) | |
end | |
def accepted_handler(event) | |
# no-op | |
end | |
def recheduled_handler(event) | |
meeting = find(event.entity_id) | |
params = event.data.symbolize_keys | |
meeting.update_attribute(time: params.fetch(:new_time)) | |
end | |
def finished_handler(event) | |
meeting = find(event.entity_id) | |
meeting.update_attribute(status: :finished) | |
end | |
def cancelled_handler(event) | |
meeting = find(event.entity_id) | |
meeting.update_attribute(status: :cancelled) | |
end | |
end | |
end | |
class MeetingsController < ApplicationController | |
def new | |
@meeting_form = MeetingForm.new | |
end | |
def create | |
response = EventSourced::Meeting.schedule(schedule_meeting_params) | |
if response.success? | |
@meeting = response.data[:new_meeting] | |
@meeting.accept(user_id: current_user.id) | |
redirect_to meeting_path(@meeting) | |
else | |
@meeting_form = MeetingForm.new | |
response.add_errors_to(active_record: @meeting_form) | |
render 'new' | |
end | |
end | |
def accept | |
meeting_id = accept_meeting_params[:meeting_id] | |
response = EventSourced::Meeting | |
.get(meeting_id) | |
.accept(user_id: accept_meeting_params[:user_id]) | |
flash[:alert] = "Failed to accept the meeting"if response.failure? | |
redirect_to meeting_path(meeting_id) | |
end | |
private | |
def schedule_meeting_params | |
params.require(:meeting_form).permit(:time, :organizer_id, :invitee_ids) | |
end | |
def accept_meeting_params | |
params.require(:accept_form).permit(:meeting_id, :user_id) | |
end | |
end | |
module Commands | |
module Meeting | |
ScheduleSchema = Dry::Validation.Schema do | |
required(:time).value(gt?: Time.now + 5.minutes) | |
required(:organizer_id).filled | |
required(:invitee_ids).filled { each(:int?) } | |
end | |
class Schedule < Command | |
def call | |
return validate if validate.failure? | |
event = publish_event( | |
event_type: 'scheduled', | |
entity_type: 'Meeting', | |
entity_id: @entity.id, | |
entity_version: @entity.version, | |
data: { | |
time: @params[:time], | |
organizer_id: @params[:organizer_id], | |
invitee_ids: @params[:invitee_ids] | |
} | |
) | |
@entity.handle(event) | |
Response.new success: true, data: { new_meeting: @entity } | |
end | |
end | |
end | |
end | |
# LocalPublisher being a Wisper publisher https://github.com/krisleech/wisper | |
LocalPublisher.subscribe Meeting, on: :meeting_scheduled, with: :scheduled_handler | |
LocalPublisher.subscribe Meeting, on: :meeting_accepted, with: :accepted_handler | |
LocalPublisher.subscribe Meeting, on: :meeting_rescheduled, with: :rescheduled_handler | |
LocalPublisher.subscribe Meeting, on: :meeting_finished, with: :finished_handler | |
LocalPublisher.subscribe Meeting, on: :meeting_cancelled, with: :cancelled_handler |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment