Skip to content

Instantly share code, notes, and snippets.

@mariuszkapcia
Created January 28, 2019 17:49
Show Gist options
  • Save mariuszkapcia/a30754f8780837593cabff27765ecda2 to your computer and use it in GitHub Desktop.
Save mariuszkapcia/a30754f8780837593cabff27765ecda2 to your computer and use it in GitHub Desktop.
AggregateRoot pattern with extracted load method, store method, and version attribute to AggregateRepository.
class AggregateRepository
def load(aggregate, stream_name, event_store:)
events = event_store.read.stream(stream_name).forward.each
events.each do |event|
aggregate.apply(event)
end
@version = events.size - 1
aggregate.clear_unpublished_events
end
def store(aggregate, stream_name, event_store:)
aggregate.unpublished_events.each do |event|
event_store.publish(
event,
stream_name: stream_name,
expected_version: @version
)
@version += 1
end
aggregate.clear_unpublished_events
end
def initialize
@version = -1
end
end
module MyAggregateRoot
MissingEventInApplyStrategy = Class.new(StandardError)
def default_apply_strategy(event)
"apply_#{event.class.to_s.demodulize.underscore}"
end
def apply_strategy
{}
end
def apply(event)
mutate_state(event)
unpublished_events.push(event)
end
def mutate_state(event)
begin
callback_method = if apply_strategy.present?
apply_strategy.fetch(event.class.to_s.demodulize.underscore.to_sym)
else
default_apply_strategy(event)
end
rescue KeyError
raise MissingEventInApplyStrategy.new("#{event.class} is missing in apply_strategy.")
end
send(callback_method, event)
end
def unpublished_events
@unpublished_events ||= []
end
def clear_unpublished_events
@unpublished_events = []
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment