Skip to content

Instantly share code, notes, and snippets.

@apneadiving
Last active September 29, 2022 21:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apneadiving/d165e010007de3b4d47a4cf02136601e to your computer and use it in GitHub Desktop.
Save apneadiving/d165e010007de3b4d47a4cf02136601e to your computer and use it in GitHub Desktop.
Changeset - proof of concept
result_object = Domain::SomeCommand.new(some_input).call
# pushing a changeset at the very end of the process in order to:
# - commit actual db operations
# - send events, for instance trigger background jobs
result_object.change_set.push!
render json: { created_resource_id: result_object.plannable.id }
class Domain::SomeCommand
def initialize(some_input_without_orm_objects)
@some_input_without_orm_objects = some_input_without_orm_objects
# I would say: each service has its own changeset
@change_set = ChangeSet.new
@result_object = ResultObject.new
result_object.change_set = change_set
end
# or dont use a result object and just have an attr_reader with change_set
def call
child_result = ChildCommand.new(some_args).call
# and we can merge changesets from child services
change_set.merge_child(child_result.change_set)
weekly_schedule = Domain::WeeklySchedule.new(params)
plannable = Domain::Plannable.new(params)
# we can still link objects together (relationships)
plannable.link_to_schedule(weekly_schedule)
# order of db operations matters, they will be commited in the same order
change_set
.add_db_operation(
Domain::WeeklySchedule::Persistence::Create.new(weekly_schedule)
)
.add_db_operation(
Domain::Plannable::Persistence::Create.new(plannable)
)
.add_event(
name: :planning_updated,
# an event has a payload but maybe we need the transaction to be done to have relevant args
# in this case we need a proc which will be evaluated after the commit
raw_payload: -> { { weekly_schedule_id: weekly_schedule.db_id } }
)
result_object
end
ResultObject = Struct.new(:change_set, :some_return_value)
private
attr_reader :result_object
end
class ChangeSet
extend T::Sig
def initialize
@event_collection = EventCollection.new
@db_operations = []
end
sig {params(change_set: ChangeSet).(T.self_type)}
def merge_child(change_set)
event_collection.merge_child(change_set.event_collection)
db_operations.concat(change_set.db_operations)
self
end
sig {params(name: Symbol, raw_payload: T.any(Hash[String, T.untyped], T.proc.returns(Hash[String, T.untyped]))).(T.self_type)}
def add_event(name:, raw_payload:)
event_collection.add(name: name, raw_payload: raw_payload)
self
end
sig {params(persistence_handler: T.untyped).(T.self_type)}
def add_db_operation(persistence_handler)
db_operations.push(persistence_handler)
self
end
sig {returns(T.self_type)}
def push!
commit_db_operations
dispatch_events
self
end
sig {returns(T.self_type)}
def commit_db_operations
# should we move the transaction to also wrap the events?
# in other words: should we still commit to db if events fail to dispatch?
ApplicationRecord.transaction(requires_new: true) do
db_operations.each(&:commit)
end
self
end
sig {returns(T.self_type)}
def dispatch_events
event_collection.each do |event|
event_catalog.dispatch(event)
end
self
end
protected
attr_reader :event_collection, :db_operations
private
def event_catalog
@event_catalog ||= Domain::EventCatalog.new
end
class EventCollection
def initialize
@grouped_events = {}
end
def add(name:, raw_payload:)
new_event = Event.new(name: name, raw_payload: raw_payload)
add_event(new_event)
end
def merge_child(event_collection)
event_collection.all_events.each do |event|
add_event(event)
end
end
def each
uniq_events.each
end
protected
# only used for merge
def all_events
[].tap do |collection|
grouped_events.each_value do |events|
collection.concat(events)
end
end
end
private
attr_reader :grouped_events
def uniq_events
[].tap do |collection|
grouped_events.each_value do |events|
collection.concat(events.uniq)
end
end
end
def add_event(event)
grouped_events[event.name] ||= []
grouped_events[event.name].push(event)
end
end
class Event
attr_reader :name
class UnknownEventError < StandardError; end
def initialize(name:, raw_payload:)
raise UnknownEventError.new("unknown #{event.name}") unless Domain::EventCatalog::KNOWN_EVENTS.include?(name)
@name = name
@raw_payload = raw_payload
end
def payload
# - if at the time the event is added, we know all params
# raw_payload would be crystal clear
# - else, it means we need to wait for db operations to be committed
# in this case, raw_payload would be a Proc
# example, we need db id of some model which is not created yet to pass it to a worker
end
def eql?(other)
name == other.name && payload == other.payload
end
private
attr_reader :raw_payload
end
end
class Domain::EventCatalog
extend T::Sig
KNOWN_EVENTS = %i[planning_updated]
sig {params(event: ChangeSet::Event).(T.self_type)}
def dispatch(event)
public_send(event.name, event)
self
end
private
sig {params(event: ChangeSet::Event).void}
def planning_updated(event)
# trigger workers or whatever
# dont touch the database!
end
end
module Domain::Plannable::Persistence
class Base
extend T::Sig
extend T::Helpers
abstract!
sig {params(plannable: Domain::Plannable)}
def initialize(plannable)
@plannable = plannable
end
sig {returns.void}
def commit; end
end
class Create < Base
sig {override.void}
def commit
if plannable.rest?
new_rest = ::Rest.create!(plannable.rest_params)
plannable.db_id = new_rest.id
else
# Shift...
end
rescue ActiveRecord::RecordInvalid => exception
# raise custom exception here to be able to link to plannable
# to be decided
end
end
class Update < Base
sig {override.void}
def commit
end
end
class Destroy < Base
sig {override.void}
def commit
end
end
class BulkUpsert < Base
sig {override.void}
def commit
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment