Skip to content

Instantly share code, notes, and snippets.

@kjellm kjellm/app.rb
Last active Nov 18, 2018

Embed
What would you like to do?
Event Source proof of concept. Copyright 2017 Kjell-Magne Øierud. License: MIT https://opensource.org/licenses/MIT
require_relative 'base'
require_relative 'event'
require_relative 'cmd'
require_relative 'crud'
require_relative 'model'
require_relative 'read'
require 'pp'
class Application < BaseObject
def initialize
@recording_id = UUID.generate
@release_id = UUID.generate
initialize_projections
end
def main
puts "LOGG ---------------------------------------------------------"
run_commands
puts
puts "EVENT STORE ------------------------------------------------"
pp registry.event_store
puts
puts "PROJECTIONS ------------------------------------------------"
peek_at_projections
end
private
def initialize_projections
@the_recording_projection = RecordingProjection.new
@the_release_projection = ReleaseProjection.new(@the_recording_projection)
@the_totals_projection = TotalsProjection.new
@projections = [
@the_release_projection,
@the_recording_projection,
@the_totals_projection,
]
end
def peek_at_projections
p @the_release_projection.find @release_id
p @the_recording_projection.find @recording_id
p @the_totals_projection.totals
end
def run_commands
recording_data = {id: @recording_id, title: "Sledge Hammer",
artist: "Peter Gabriel", duration: 313}
run(recording_data, CreateRecording, Recording)
run({id: @release_id, title: "So", tracks: []},
CreateRelease, Release)
run({id: UUID.generate, title: "Shaking The Tree",
tracks: [@recording_id]},
CreateRelease, Release)
run({id: @release_id, title: "So", tracks: [@recording_id]},
UpdateRelease, Release)
run(recording_data.merge({ title: "Sledgehammer" }),
UpdateRecording, Recording)
# Some failing commands, look in log for verification of failure
run({id: "Non-existing ID", title: "Foobar"},
UpdateRecording, Recording)
end
def run(request_data, command_class, aggregate)
logg "Incoming request with data: #{request_data.inspect}"
command_handler = registry.command_handler_for(aggregate)
command = command_class.new(request_data)
command_handler.handle command
rescue StandardError => e
logg "ERROR: Command #{command} failed because of: #{e}"
end
end
Application.new.main
require 'set'
require 'date'
require 'securerandom'
class String
def snake_case
split(/(?=[A-Z]+)/).map(&:downcase).join("_")
end
end
class Hash
def slice(*keys)
keys.each_with_object(self.class.new) do
|k, hash| hash[k] = self[k] if has_key?(k)
end
end
end
module UUID
def self.generate
SecureRandom.uuid
end
def self.as_int(uuid)
Integer(uuid.split("-").join, 16)
end
def self.from_int(int)
int.to_s(16).rjust(32, '0').split(/(\h{8})(\h{4})(\h{4})(\h{4})(\h{12})/)[1..-1].join("-")
end
end
module Attributes
def attributes(*names)
attr_reader(*names)
define_singleton_method(:attribute_names) { names }
mod = Module.new do
define_method :initialize do |**attrs|
attrs.each do |name, value|
raise ArgumentError.new "Unrecognized argument: #{name}" unless names.include? name
if respond_to? "#{name}=", true
send "#{name}=", value
else
instance_variable_set "@#{name}", value
end
end
super(**attrs)
end
end
include mod
names
end
end
class BaseObject
extend Attributes
def initialize(*_args)
end
module ClassAndInstanceMethods
def logg(*args)
print "#{DateTime.now} - ", *args
puts
end
def registry
@@registry ||= Registry.new
end
end
include ClassAndInstanceMethods
extend ClassAndInstanceMethods
def to_h
Hash[self.class.attribute_names.map {|name| [name, send(name)] }]
end
end
class Registry < BaseObject
def command_handler_for(klass)
handler = if klass.respond_to? :handle
klass
else
self.class.const_get("#{klass}CommandHandler").new
end
CommandHandlerLoggDecorator.new(handler)
end
def event_store
@event_store ||=
EventStoreOptimisticLockDecorator.new(
EventStoreLoggDecorator.new(
EventStorePubSubDecorator.new(
EventStore.new)))
end
def repository_for(klass)
if klass < CrudAggregate
klass
else
self.class.const_get("#{klass}Repository").new
end
end
end
class Entity < BaseObject
def set_attributes(attrs)
(self.class.attribute_names - [:id]).each do |name|
instance_variable_set(:"@#{name}", attrs[name]) if attrs.key?(name)
end
end
end
class ValueObject < BaseObject
end
class EventStoreError < StandardError
end
class EventStoreConcurrencyError < EventStoreError
end
class Event < ValueObject
end
module Validations
def required(*values)
values.none?(&:nil?) or
raise ArgumentError
end
def non_blank_string(obj)
return unless obj
obj.is_a?(String) && !obj.strip.empty? or
raise ArgumentError
end
def positive_integer(obj)
return unless obj
obj.is_a?(Integer) && obj > 0 or
raise ArgumentError
end
end
class Command < ValueObject
include Validations
def initialize(*args)
super
validate
end
def validate
raise "Implement in subclass! #{self.class.name}"
end
end
class CommandHandler < BaseObject
module InstanceMethods
def handle(command)
process(command)
return
end
def process(command)
message = "process_" + command.class.name.snake_case
send message.to_sym, command
end
end
include InstanceMethods
end
class CommandHandlerLoggDecorator < DelegateClass(CommandHandler)
def initialize(obj)
super obj
end
def handle(command)
logg "Start handling: #{command.inspect}"
super
ensure
logg "Done handling: #{command.class.name}"
end
end
class CrudCommandHandler < CommandHandler
module InstanceMethods
private
def validator(obj)
raise "Implement in subclass!"
end
def repository
raise "Implement in subclass!"
end
def type
raise "Implement in subclass!"
end
def process_create(command)
repository.unit_of_work(command.id) do |uow|
obj = type.new(command.to_h)
validator(obj).assert_validity
event = self.class.const_get("#{type}Created").new(command.to_h)
uow.create
uow.append event
end
end
def process_update(command)
repository.unit_of_work(command.id) do |uow|
obj = repository.find command.id
raise ArgumentError if obj.nil?
obj.set_attributes command.to_h
validator(obj).assert_validity
event = self.class.const_get("#{type}Updated").new(command.to_h)
uow.append event
end
end
end
include InstanceMethods
end
module CrudAggregate
module ClassMethods
def repository
self
end
def validator(obj)
obj
end
end
def assert_validity
end
def self.included(othermod)
othermod.extend CommandHandler::InstanceMethods
othermod.extend CrudCommandHandler::InstanceMethods
othermod.extend EventStoreRepository::InstanceMethods
othermod.extend ClassMethods
othermod_name = othermod.name.snake_case
othermod.define_singleton_method("type") { othermod }
othermod.define_singleton_method "process_create_#{othermod_name}" do |command|
process_create command
end
othermod.define_singleton_method "process_update_#{othermod_name}" do |command|
process_update command
end
othermod.define_singleton_method("apply_#{othermod_name}_updated") do |obj, event|
obj.set_attributes(event.to_h)
end
end
end
class EventStream < BaseObject
def initialize
@event_sequence = []
end
def version
@event_sequence.length
end
def append(*events)
@event_sequence.push(*events)
end
def to_a
@event_sequence.clone
end
end
class EventStore < BaseObject
def initialize
@streams = {}
end
def create(id)
raise EventStoreError, "Stream exists for #{id}" if @streams.key? id
@streams[id] = EventStream.new
end
def append(id, *events)
@streams.fetch(id).append(*events)
end
def event_stream_for(id)
@streams[id]&.clone
end
def event_stream_version_for(id)
@streams[id]&.version || 0
end
end
class EventStoreOptimisticLockDecorator < DelegateClass(EventStore)
def initialize(obj)
super
@locks = {}
end
def create(id)
@locks[id] = Mutex.new
super
end
def append(id, expected_version, *events)
@locks[id].synchronize do
event_stream_version_for(id) == expected_version or
raise EventStoreConcurrencyError
super id, *events
end
end
end
class EventStorePubSubDecorator < DelegateClass(EventStore)
def initialize(obj)
super
@subscribers = []
end
def add_subscriber(subscriber)
@subscribers << subscriber
end
def append(id, *events)
super
publish(*events)
end
def publish(*events)
events.each do |e|
@subscribers.each do |sub|
sub.apply e
end
end
end
end
class EventStoreLoggDecorator < DelegateClass(EventStore)
def append(id, *events)
super
logg "New events: #{events}"
end
end
class UnitOfWork < BaseObject
def initialize(event_store, id)
@id = id
@event_store = event_store
@expected_version = event_store.event_stream_version_for(id)
end
def create
@event_store.create @id
end
def append(*events)
@event_store.append @id, @expected_version, *events
end
end
class EventStoreRepository < BaseObject
module InstanceMethods
def find(id)
stream = registry.event_store.event_stream_for(id)
return if stream.nil?
build stream.to_a
end
def unit_of_work(id)
yield UnitOfWork.new(registry.event_store, id)
end
private
def build(stream)
obj = type.new stream.first.to_h
stream[1..-1].each do |event|
message = "apply_" + event.class.name.snake_case
send message.to_sym, obj, event
end
obj
end
end
include InstanceMethods
end
#
# R E L E A S E
#
# a.k.a. Album
#
# Shows an example of using CrudAggregate. All stuff rolled into one
# class. Useful for the simplest aggregates that only needs CRUD
# operations.
#
RELEASE_ATTRIBUTES = %I(id title tracks)
class Release < Entity
attributes *RELEASE_ATTRIBUTES
include CrudAggregate
def assert_validity
# Do something here
end
end
class ReleaseCommand < Command
private
def validate
required(*RELEASE_ATTRIBUTES.map {|m| send m})
non_blank_string(title)
end
end
class CreateRelease < ReleaseCommand
attributes *RELEASE_ATTRIBUTES
end
class ReleaseCreated < Event
attributes *RELEASE_ATTRIBUTES
end
class UpdateRelease < ReleaseCommand
attributes *RELEASE_ATTRIBUTES
end
class ReleaseUpdated < Event
attributes *RELEASE_ATTRIBUTES
end
#
# R E C O R D I N G
#
# Shows an example where all the different responsibilities are
# handled by separate objects.
#
class RecordingRepository < EventStoreRepository
def type
Recording
end
def apply_recording_updated(recording, event)
recording.set_attributes(event.to_h)
end
end
class RecordingValidator < BaseObject
def initialize(obj)
end
def assert_validity
# Do something here
end
end
class RecordingCommandHandler < CrudCommandHandler
private
def type; Recording; end
def repository
@repository ||= registry.repository_for(Recording)
end
def validator(obj)
RecordingValidator.new(obj)
end
def process_create_recording(command)
process_create(command)
end
def process_update_recording(command)
process_update(command)
end
end
RECORDING_ATTRIBUTES = %I(id title artist duration)
class RecordingCommand < Command
private
def validate
required(*RECORDING_ATTRIBUTES.map {|m| send m})
non_blank_string(title)
non_blank_string(artist)
positive_integer(duration)
end
end
class CreateRecording < RecordingCommand
attributes *RECORDING_ATTRIBUTES
end
class RecordingCreated < Event
attributes *RECORDING_ATTRIBUTES
end
class UpdateRecording < RecordingCommand
attributes *RECORDING_ATTRIBUTES
end
class RecordingUpdated < Event
attributes *RECORDING_ATTRIBUTES
end
class Recording < Entity
attributes *RECORDING_ATTRIBUTES
end
class RepositoryProjection < BaseObject
def initialize
@repository = registry.repository_for type
end
def find(id)
@repository.find(id).to_h
end
def apply(*_args); end
private
def type
raise "Implement in subclass! #{self.class.name}"
end
end
class RecordingProjection < RepositoryProjection
def type
Recording
end
end
class SubscriberProjection < BaseObject
def initialize
@store = {}
registry.event_store.add_subscriber(self)
end
def find(id)
@store[id]&.clone
end
def apply(event)
handler_name = "when_#{event.class.name.snake_case}".to_sym
send handler_name, event if respond_to?(handler_name)
end
end
class ReleaseProjection < SubscriberProjection
def initialize(recordings)
super()
@recordings = recordings
end
def when_release_created(event)
release = build_release_from_event_data event
@store[event.id] = release
end
def when_release_updated(event)
release = build_release_from_event_data event
@store[event.id].merge! release
end
def when_recording_updated(_event)
refresh_all_tracks
end
private
def build_release_from_event_data(event)
release = event.to_h
release[:tracks] = track_id_to_data release.fetch(:tracks)
derive_artist_from_tracks(release)
release
end
def track_id_to_data(track_ids)
track_ids.map { |id| @recordings.find(id).to_h }
end
def refresh_all_tracks
@store.values.each do |r|
r.fetch(:tracks).map! {|track| track.fetch(:id)}
r[:tracks] = track_id_to_data r.fetch(:tracks)
end
end
def derive_artist_from_tracks(release)
artists = release[:tracks].map {|rec| rec[:artist]}.uniq
release[:artist] = artists.length == 1 ? artists.first : "Various artists"
end
end
class TotalsProjection < SubscriberProjection
def initialize
super
@totals = Hash.new(0)
end
def when_recording_created(event)
handle_create_event event
end
def when_release_created(event)
handle_create_event event
end
attr_reader :totals
private
def handle_create_event(event)
@totals[event.class] += 1
end
end
@sbellware

This comment has been minimized.

Copy link

sbellware commented Feb 9, 2017

Shameless plug... If you like this stuff, we have a complete implementation of Event Sourcing libraries in Ruby with support for EventStore and Postgres: http://eventide-project.org/quick_look.html

@kjellm

This comment has been minimized.

Copy link
Owner Author

kjellm commented Feb 13, 2017

@sbellware: Thanks for sharing, looks interesting :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.