Skip to content

@avsej /gist:3135796
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Two phase commit for Couchbase
require 'rubygems'
require 'couchbase'
cb = Couchbase.bucket
karen = {"name" => "karen", "points" => 500, "transactions" => []}
dipti = {"name" => "dipti", "points" => 700, "transactions" => []}
# preload initial documents
cb.set("karen", karen)
cb.set("dipti", dipti)
# prepare transaction document
trans = {"source" => "karen", "destination" => "dipti", "amount" => 100, "state" => "initial"}
cb.set("trans:1", trans)
begin
# STEP 1: Switch transaction into pending state
cb.cas("trans:1") do
trans.update("state" => "pending")
end
# STEP 2: Apply transaction to both documents
cb.cas("karen") do |val|
val.update("points" => val["points"] - 100,
"transactions" => val["transactions"] + ["trans:1"])
end
cb.cas("dipti") do |val|
val.update("points" => val["points"] + 100,
"transactions" => val["transactions"] + ["trans:1"])
end
# STEP 3: Switch transaction into committed state
cb.cas("trans:1") do |val|
val.update("state" => "committed")
end
# STEP 4: Remove transaction from the documents
cb.cas("karen") do |val|
val.update("transactions" => val["transactions"] - ["trans:1"])
end
cb.cas("dipti") do |val|
val.update("transactions" => val["transactions"] - ["trans:1"])
end
# STEP 5: Switch transaction into done state
cb.cas("trans:1") do |val|
val.update("state" => "done")
end
rescue Couchbase::Error::Base => ex
# Rollback transaction
trans = cb.get("trans:1")
case trans["state"]
when "committed"
# Create new transaction and swap the targets or amount sign.
#
# The code block about could be wrapped in the method something like
#
# def transfer(source, destination, amount)
# ...
# end
#
# So that this handler could just re-use it.
when "pending"
# STEP 1: Switch transaction into cancelling state
cb.cas("trans:1") do |val|
val.update("state" => "cancelling")
end
# STEP 2: Revert changes if they were applied
cb.cas("karen") do |val|
break unless val["transactions"].include?("trans:1")
val.update("points" => val["points"] + 100,
"transactions" => val["transactions"] - ["trans:1"])
end
cb.cas("dipti") do |val|
break unless val["transactions"].include?("trans:1")
val.update("points" => val["points"] - 100,
"transactions" => val["transactions"] - ["trans:1"])
end
# STEP 3: Switch transaction into cancelled state
cb.cas("trans:1") do |val|
val.update("state" => "cancelled")
end
end
# Re-raise original exception
raise ex
end
@avsej
Owner

I'm considering that between line 52 trans = cb.get("trans:1") and further transaction updates the transaction document won't change. More robust version will be

trans, flags, cas = cb.get("trans:1", :extended => true)
case trans["state"]
when "committed"
  # ...
when "pending"
  cb.set("trans:1", trans.update("state" => "cancelling"), :flags => flags, :cas => cas)
  # ...
end

Becase #cas is #get + #set

Another point, I found that Bucket#cas operation doesn't replicate flags from original key. http://www.couchbase.com/issues/browse/RCBC-59

@avsej
Owner

Improved version is here: https://gist.github.com/3136027

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.