-
-
Save 1gor/26861dff5594adcfe0ca823b7a0752c9 to your computer and use it in GitHub Desktop.
An introduction to event sourcing, London Ruby User Group, 9th May 2018
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Commands | |
module Meeting | |
AcceptSchema = Dry::Validation.Schema do | |
required(:user_id).filled | |
required(:status).value(eql?: :scheduled) | |
end | |
class Accept < Command | |
def call | |
return validate if validate.failure? | |
event = publish_event( | |
event_type: 'accepted', | |
entity_type: 'Meeting', | |
entity_id: @entity.id, | |
entity_version: @entity.version, | |
data: { user_id: @params[:user_id] } | |
) | |
@entity.handle(event) | |
Response.new success: true | |
end | |
end | |
end | |
end | |
module Commands | |
class Command | |
def self.call(entity, params) | |
new(entity, params).call | |
end | |
def initialize(entity, params) | |
@entity = entity | |
@params = params | |
@validator = schema.call(params) | |
end | |
def publish_event(params) | |
Event.create_and_broadcast(params) | |
end | |
def validate | |
@validator.success? ? Response.new(success: true) : Response.new(success: false, errors: @validator.errors) | |
end | |
def schema | |
"#{self.class.name}Schema".constantize | |
end | |
end | |
end | |
create_table 'events', id: :serial do |t| | |
t.uuid 'entity_id', null: false, index: true | |
t.string 'entity_type', null: false | |
t.string 'event_type', null: false | |
t.int 'entity_version', null: false | |
t.jsonb 'meta', null: false | |
t.jsonb 'data', null: false | |
t.datetime 'created_at', null: false | |
t.index ['entity_id', 'entity_version'], name: 'event_version_constraint', unique: true | |
end | |
module EventSourced | |
class Meeting | |
attr_reader :id, ,:version, :status, :atteendees_count, :time | |
def initialize(id) | |
@id = id | |
@attendees_count = 0 | |
@version = 0 | |
end | |
def schedule(time:, organizer_id:, invitee_ids) | |
Commands::Meeting::Schedule.call(self, time: meeting_time, organizer_id: organizer_id, invitee_ids: invitee_ids) | |
end | |
def scheduled_handler(event) | |
params = event.data.symbolize_keys | |
@time = params[:time] | |
end | |
def accept(user_id:) | |
Commands::Meeting::Accept.call(self, user_id: user_id, status: @status) | |
end | |
def accepted_handler(event) | |
@attendees_count += 1 | |
end | |
def reschedule(new_time:) | |
Commands::Meeting::Reschedule.call(self, new_time: new_time, status: @status) | |
end | |
def rescheduled_handler(event) | |
params = event.data.symbolize_keys | |
@time = params[:new_time] | |
end | |
def finish | |
Commands::Meeting::Finish.call(self, status: @status) | |
end | |
def finished_handler(event) | |
@status = :finished | |
end | |
def cancel | |
Commands::Meeting::Cancel.call(self, status: @status) | |
end | |
def cancelled_handler(event) | |
@status = :cancelled | |
end | |
def handle(event) | |
handler = "#{event.event_type}_handler".to_sym | |
if entity.respond_to? handler | |
send(handler, event) | |
@version += 1 | |
end | |
end | |
class << self | |
def get(id) | |
entity = new(id) | |
load_events_for(entity) | |
entity | |
end | |
def schedule(time:, organizer_id:, invitee_ids:) | |
EventSourced::Meeting.new(SecureRandom.uuid).schedule( | |
time: time, | |
organizer_id: organizer_id, | |
invitee_ids: invitee_ids | |
) | |
end | |
def load_events_for(entity) | |
events = Event.where(entity_type: 'meeting', entity_id: entity.id) | |
events.each { |event| entity.handle(event) } | |
end | |
end | |
end | |
end | |
class Meeting < ApplicationRecord | |
has_one :organizer | |
has_many :invitees | |
class << self | |
def scheduled_handler(event) | |
params = event.data.symbolize_keys | |
create!( | |
entity_id: event.entity_id, | |
time: params.fetch(:time), | |
organizer_id: params.fetch(:organizer_id), | |
created_at: event.created_at | |
) | |
end | |
def accepted_handler(event) | |
# no-op | |
end | |
def recheduled_handler(event) | |
meeting = find(event.entity_id) | |
params = event.data.symbolize_keys | |
meeting.update_attribute(time: params.fetch(:new_time)) | |
end | |
def finished_handler(event) | |
meeting = find(event.entity_id) | |
meeting.update_attribute(status: :finished) | |
end | |
def cancelled_handler(event) | |
meeting = find(event.entity_id) | |
meeting.update_attribute(status: :cancelled) | |
end | |
end | |
end | |
class MeetingsController < ApplicationController | |
def new | |
@meeting_form = MeetingForm.new | |
end | |
def create | |
response = EventSourced::Meeting.schedule(schedule_meeting_params) | |
if response.success? | |
@meeting = response.data[:new_meeting] | |
@meeting.accept(user_id: current_user.id) | |
redirect_to meeting_path(@meeting) | |
else | |
@meeting_form = MeetingForm.new | |
response.add_errors_to(active_record: @meeting_form) | |
render 'new' | |
end | |
end | |
def accept | |
meeting_id = accept_meeting_params[:meeting_id] | |
response = EventSourced::Meeting | |
.get(meeting_id) | |
.accept(user_id: accept_meeting_params[:user_id]) | |
flash[:alert] = "Failed to accept the meeting"if response.failure? | |
redirect_to meeting_path(meeting_id) | |
end | |
private | |
def schedule_meeting_params | |
params.require(:meeting_form).permit(:time, :organizer_id, :invitee_ids) | |
end | |
def accept_meeting_params | |
params.require(:accept_form).permit(:meeting_id, :user_id) | |
end | |
end | |
module Commands | |
module Meeting | |
ScheduleSchema = Dry::Validation.Schema do | |
required(:time).value(gt?: Time.now + 5.minutes) | |
required(:organizer_id).filled | |
required(:invitee_ids).filled { each(:int?) } | |
end | |
class Schedule < Command | |
def call | |
return validate if validate.failure? | |
event = publish_event( | |
event_type: 'scheduled', | |
entity_type: 'Meeting', | |
entity_id: @entity.id, | |
entity_version: @entity.version, | |
data: { | |
time: @params[:time], | |
organizer_id: @params[:organizer_id], | |
invitee_ids: @params[:invitee_ids] | |
} | |
) | |
@entity.handle(event) | |
Response.new success: true, data: { new_meeting: @entity } | |
end | |
end | |
end | |
end | |
# LocalPublisher being a Wisper publisher https://github.com/krisleech/wisper | |
LocalPublisher.subscribe Meeting, on: :meeting_scheduled, with: :scheduled_handler | |
LocalPublisher.subscribe Meeting, on: :meeting_accepted, with: :accepted_handler | |
LocalPublisher.subscribe Meeting, on: :meeting_rescheduled, with: :rescheduled_handler | |
LocalPublisher.subscribe Meeting, on: :meeting_finished, with: :finished_handler | |
LocalPublisher.subscribe Meeting, on: :meeting_cancelled, with: :cancelled_handler |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment