Skip to content

Instantly share code, notes, and snippets.

@sclinede
Last active March 6, 2022 06:09
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save sclinede/69afbd78fde9cc7d2dc3afd39ce3002d to your computer and use it in GitHub Desktop.
Save sclinede/69afbd78fde9cc7d2dc3afd39ce3002d to your computer and use it in GitHub Desktop.
Example of Full Saga implementation in Ruby (with Usage example, also)
class Saga
class << self
def with_redis; raise NotImplementedError; end
attr_accessor :cleanup_delay, :queue, :last_txid
def queue
Thread.current[:saga_queue] ||= []
end
def last_txid
Thread.current[:last_txid]
end
def last_txid=(value)
Thread.current[:last_txid] = value
end
def chain(subject_klass, args:)
txid = Array(args).unshift(subject_klass).join(":")
self.queue << [new(txid, subject_klass, parent: last_txid), args]
self.last_txid = txid
self
end
def clean
self.last_txid = nil
saga, args = queue.pop
return self unless saga
saga.clean(*args, &method(:clean))
end
def call
self.last_txid = nil
saga, args = queue.shift
return self unless saga
saga.call(*args, &method(:call))
end
end
TRANSACTION_MARKER = "(reserved)".freeze
def initialize(txid, subject_klass, parent: nil)
@txid = txid
@parent_txid = parent
@subject = subject_klass # a class of 2 methods: #call, #undo
end
def call(*args)
with_lock do |redis|
begin
@subject.call(*args)
yield if block_given?
rescue
@subject.undo(*args)
finish_transaction(redis)
raise
end
end
end
def clean(*args)
self.class.with_redis do |redis|
case (children = redis.smembers(key(@txid))).size.to_i
when 0
# nothing to do
when 1
@subject.undo(*args)
finish_transaction(redis)
else
children.each do |child_tx|
next if child_tx.eql?(TRANSACTION_MARKER)
enqueue_cleanup(*args, txid: child_tx)
end
end
end
yield if block_given?
end
private
def key(txid)
"saga:#{txid}:lock"
end
def with_lock(*args)
self.class.with_redis do |redis|
return false if transaction_in_progress?(redis)
begin_transaction(redis)
enqueue_cleanup(*args)
yield(redis)
finish_transaction(redis)
end
true
end
def transaction_in_progress?(redis)
redis.smembers(key(@txid)).size.positive?
end
def begin_transaction(redis)
redis.multi do |batched_redis|
batched_redis.sadd(key(@parent_txid), key(@txid)) if @parent_txid
batched_redis.sadd(key(@txid), TRANSACTION_MARKER)
end
end
def finish_transaction(redis)
redis.multi do |batched_redis|
batched_redis.srem(key(@txid), TRANSACTION_MARKER)
batched_redis.srem(key(@parent_txid), key(@txid)) if @parent_txid
end
end
def enqueue_cleanup(*args, txid: @txid)
Saga::CleanWorker.perform_in(
cleanup_delay,
txid,
@subject.to_s,
*args
)
end
def cleanup_delay
self.class.cleanup_delay || 3600
end
require 'sidekiq'
class CleanWorker
include Sidekiq::Worker
def perform(txid, subject_klass, *args)
Saga.new(txid, subject_klass.constantize).clean(*args)
end
end
end
require_relative "saga.rb"
User = Struct.new(:id, :name, :email, :external_id)
Parcel = Struct.new(:id, :address, :user_id, :external_id)
DB = {
users: {
"1" => {id: "1", name: "John Doe", email: "john@doe.com", external_id: nil},
"2" => {id: "2", name: "Jane Doe", email: "jane@doe.com", external_id: nil},
},
parcels: {
"4" => {id: "4", address: "Lenina, 2", user_id: "1", external_id: nil},
"5" => {id: "5", address: "Kuybisheva, 4", user_id: "2", external_id: nil},
}
}
class APIClient
def create_user(name, email)
{user_id: 123}
end
def delete_user(email)
{user_id: 123}
end
def create_order(user_id, address, reference_id)
raise "Sorry connection bad" if ENV["CONN_ERR"]
{order_id: 345}
end
def delete_order(reference_id)
{order_id: 345}
end
end
class RegisterUser
attr_reader :client, :user
def initialize(user_id)
@user = DB[:users][user_id.to_s].to_h
@client = APIClient.new
end
def self.call(user_id); new(user_id).create_user!; end
def self.undo(user_id); new(user_id).delete_user!; end
def create_user!
response = client.create_user(*@user.values_at(:name, :email))
user[:external_id] = response.fetch(:user_id)
DB[:users][user[:id]] = user
end
def delete_user!
client.delete_user(@user.fetch(:email))
@user.delete(:external_id)
end
end
class CreateOrder
attr_reader :client, :parcel
def initialize(parcel_id)
@parcel = DB[:parcels][parcel_id]
@client = APIClient.new
end
def self.call(parcel_id); new(parcel_id).create_order!; end
def self.undo(parcel_id); new(parcel_id).delete_order!; end
def reference_id
"internal_parcel_id_#{parcel.fetch(:id)}"
end
def create_order!
response = client.create_order(
@user_external_id, parcel[:address], reference_id
)
parcel[:external_id] = response.fetch(:order_id)
DB[:parcels][parcel[:id]] = parcel
end
def delete_order!
client.delete_order(reference_id)
parcel.delete(:external_id)
end
end
class Saga
require 'redis'
def self.redis_connection
Thread.current[:redis] ||= Redis.new(redis_url: "redis://localhost/12")
end
def self.with_redis
yield(redis_connection)
end
end
john_id = "1"
john_parcel_id = "4"
Saga.chain(RegisterUser, args: john_id)
.chain(CreateOrder, args: john_parcel_id)
.clean
Saga.chain(RegisterUser, args: john_id)
.chain(CreateOrder, args: john_parcel_id)
.call
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment