-
-
Save apneadiving/d165e010007de3b4d47a4cf02136601e to your computer and use it in GitHub Desktop.
Changeset - proof of concept
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
result_object = Domain::SomeCommand.new(some_input).call | |
# pushing a changeset at the very end of the process in order to: | |
# - commit actual db operations | |
# - send events, for instance trigger background jobs | |
result_object.change_set.push! | |
render json: { created_resource_id: result_object.plannable.id } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Domain::SomeCommand | |
def initialize(some_input_without_orm_objects) | |
@some_input_without_orm_objects = some_input_without_orm_objects | |
# I would say: each service has its own changeset | |
@change_set = ChangeSet.new | |
@result_object = ResultObject.new | |
result_object.change_set = change_set | |
end | |
# or dont use a result object and just have an attr_reader with change_set | |
def call | |
child_result = ChildCommand.new(some_args).call | |
# and we can merge changesets from child services | |
change_set.merge_child(child_result.change_set) | |
weekly_schedule = Domain::WeeklySchedule.new(params) | |
plannable = Domain::Plannable.new(params) | |
# we can still link objects together (relationships) | |
plannable.link_to_schedule(weekly_schedule) | |
# order of db operations matters, they will be commited in the same order | |
change_set | |
.add_db_operation( | |
Domain::WeeklySchedule::Persistence::Create.new(weekly_schedule) | |
) | |
.add_db_operation( | |
Domain::Plannable::Persistence::Create.new(plannable) | |
) | |
.add_event( | |
name: :planning_updated, | |
# an event has a payload but maybe we need the transaction to be done to have relevant args | |
# in this case we need a proc which will be evaluated after the commit | |
raw_payload: -> { { weekly_schedule_id: weekly_schedule.db_id } } | |
) | |
result_object | |
end | |
ResultObject = Struct.new(:change_set, :some_return_value) | |
private | |
attr_reader :result_object | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ChangeSet | |
extend T::Sig | |
def initialize | |
@event_collection = EventCollection.new | |
@db_operations = [] | |
end | |
sig {params(change_set: ChangeSet).(T.self_type)} | |
def merge_child(change_set) | |
event_collection.merge_child(change_set.event_collection) | |
db_operations.concat(change_set.db_operations) | |
self | |
end | |
sig {params(name: Symbol, raw_payload: T.any(Hash[String, T.untyped], T.proc.returns(Hash[String, T.untyped]))).(T.self_type)} | |
def add_event(name:, raw_payload:) | |
event_collection.add(name: name, raw_payload: raw_payload) | |
self | |
end | |
sig {params(persistence_handler: T.untyped).(T.self_type)} | |
def add_db_operation(persistence_handler) | |
db_operations.push(persistence_handler) | |
self | |
end | |
sig {returns(T.self_type)} | |
def push! | |
commit_db_operations | |
dispatch_events | |
self | |
end | |
sig {returns(T.self_type)} | |
def commit_db_operations | |
# should we move the transaction to also wrap the events? | |
# in other words: should we still commit to db if events fail to dispatch? | |
ApplicationRecord.transaction(requires_new: true) do | |
db_operations.each(&:commit) | |
end | |
self | |
end | |
sig {returns(T.self_type)} | |
def dispatch_events | |
event_collection.each do |event| | |
event_catalog.dispatch(event) | |
end | |
self | |
end | |
protected | |
attr_reader :event_collection, :db_operations | |
private | |
def event_catalog | |
@event_catalog ||= Domain::EventCatalog.new | |
end | |
class EventCollection | |
def initialize | |
@grouped_events = {} | |
end | |
def add(name:, raw_payload:) | |
new_event = Event.new(name: name, raw_payload: raw_payload) | |
add_event(new_event) | |
end | |
def merge_child(event_collection) | |
event_collection.all_events.each do |event| | |
add_event(event) | |
end | |
end | |
def each | |
uniq_events.each | |
end | |
protected | |
# only used for merge | |
def all_events | |
[].tap do |collection| | |
grouped_events.each_value do |events| | |
collection.concat(events) | |
end | |
end | |
end | |
private | |
attr_reader :grouped_events | |
def uniq_events | |
[].tap do |collection| | |
grouped_events.each_value do |events| | |
collection.concat(events.uniq) | |
end | |
end | |
end | |
def add_event(event) | |
grouped_events[event.name] ||= [] | |
grouped_events[event.name].push(event) | |
end | |
end | |
class Event | |
attr_reader :name | |
class UnknownEventError < StandardError; end | |
def initialize(name:, raw_payload:) | |
raise UnknownEventError.new("unknown #{event.name}") unless Domain::EventCatalog::KNOWN_EVENTS.include?(name) | |
@name = name | |
@raw_payload = raw_payload | |
end | |
def payload | |
# - if at the time the event is added, we know all params | |
# raw_payload would be crystal clear | |
# - else, it means we need to wait for db operations to be committed | |
# in this case, raw_payload would be a Proc | |
# example, we need db id of some model which is not created yet to pass it to a worker | |
end | |
def eql?(other) | |
name == other.name && payload == other.payload | |
end | |
private | |
attr_reader :raw_payload | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Domain::EventCatalog | |
extend T::Sig | |
KNOWN_EVENTS = %i[planning_updated] | |
sig {params(event: ChangeSet::Event).(T.self_type)} | |
def dispatch(event) | |
public_send(event.name, event) | |
self | |
end | |
private | |
sig {params(event: ChangeSet::Event).void} | |
def planning_updated(event) | |
# trigger workers or whatever | |
# dont touch the database! | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Domain::Plannable::Persistence | |
class Base | |
extend T::Sig | |
extend T::Helpers | |
abstract! | |
sig {params(plannable: Domain::Plannable)} | |
def initialize(plannable) | |
@plannable = plannable | |
end | |
sig {returns.void} | |
def commit; end | |
end | |
class Create < Base | |
sig {override.void} | |
def commit | |
if plannable.rest? | |
new_rest = ::Rest.create!(plannable.rest_params) | |
plannable.db_id = new_rest.id | |
else | |
# Shift... | |
end | |
rescue ActiveRecord::RecordInvalid => exception | |
# raise custom exception here to be able to link to plannable | |
# to be decided | |
end | |
end | |
class Update < Base | |
sig {override.void} | |
def commit | |
end | |
end | |
class Destroy < Base | |
sig {override.void} | |
def commit | |
end | |
end | |
class BulkUpsert < Base | |
sig {override.void} | |
def commit | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment