Skip to content

Instantly share code, notes, and snippets.

@APiercey
Created May 28, 2023 16:31
Show Gist options
  • Save APiercey/2960739a7ec3bd2bfdfdc4cc557d42d6 to your computer and use it in GitHub Desktop.
Save APiercey/2960739a7ec3bd2bfdfdc4cc557d42d6 to your computer and use it in GitHub Desktop.
Repository Pattern for Event Sourcing with DynamoDB
class DynamoDBRepo
class AggregateClassUndefined < StandardError
def message
"Aggregate class is not defined"
end
end
class EventBuilderModuleUndefined < StandardError
def message
"Event builder module is not defined"
end
end
@aggregate_class = nil
@event_builder_module = nil
def initialize(dynamodb_client)
@dynamodb_client = dynamodb_client
raise AggregateClassUndefined if aggregate_class.nil?
raise EventBuilderModuleUndefined if event_builder_module.nil?
end
def self.aggregate(aggregate_class)
@aggregate_class = aggregate_class
end
def self.aggregate_class
@aggregate_class
end
def self.event_builder(event_builder)
@event_builder_module = event_builder
end
def self.event_builder_module
@event_builder_module
end
def fetch(uuid)
agg = aggregate_class.new
events = @dynamodb_client.fetch_aggregate_events(uuid)
return nil if events.empty?
agg.version = events.last.fetch("Version").to_i
events
.map { |event| build_event(event) }
.reject(&:nil?)
.each { |event| agg.apply(event) }
if agg.uuid.nil?
nil
else
agg
end
end
def store(agg)
new_version = agg.version
new_events = agg.changes.map do |event|
new_version = new_version + 1
{
EventUuid: SecureRandom.uuid,
AggregateUuid: agg.uuid,
Name: event.class::NAME,
Data: event.to_h,
Version: new_version
}
end
@dynamodb_client.insert_aggregate_events!(new_events)
agg.clear_changes
agg.version = new_version
agg
end
private
def build_event(raw_event)
event_builder_module.build(raw_event.fetch('Name'), raw_event.fetch('Data'))
end
def aggregate_class
self.class.aggregate_class
end
def event_builder_module
self.class.event_builder_module
end
end
class EsDynamoTableClient
def initialize(dynamodb_client, table_name)
@dynamodb_client = dynamodb_client
@table_name = table_name
end
def fetch_aggregate_events(aggregate_uuid)
query_options = {
table_name: @table_name,
key_condition_expression: "AggregateUuid = :aggregate_uuid",
expression_attribute_values: {
":aggregate_uuid" => aggregate_uuid,
},
consistent_read: true
}
items = []
result = @dynamodb_client.query(query_options)
loop do
items << result.items
break unless (last_evaluated_key = result.last_evaluated_key)
result = @dynamodb_client.query(query_options.merge(exclusive_start_key: last_evaluated_key))
end
items.flatten
end
def insert_aggregate_events!(events)
put_operations = events.map do |event|
{
put: {
item: event,
table_name: @table_name,
condition_expression: "attribute_not_exists(#v)",
expression_attribute_names: {
"#v" => "Version"
}
}
}
end
@dynamodb_client.transact_write_items({transact_items: put_operations})
nil
end
end
module Events
module Builder
def self.build(name, data)
case name
when "CartOpened"
Events::CartOpened.new(data.fetch("shopping_cart_uuid"))
when "ItemAdded"
Events::ItemAdded.new(data.fetch("shopping_cart_uuid"), data.fetch("item_name"))
when "CartClosed"
Events::CartClosed.new(data.fetch("shopping_cart_uuid"))
else
nil
end
end
end
end
class ShoppingCartRepo < DynamoDBRepo
aggregate ShoppingCart
event_builder Events::Builder
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment