public
Created

Two phase commit for Couchbase

  • Download Gist
gistfile1.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
require 'rubygems'
require 'couchbase'
 
cb = Couchbase.bucket
 
karen = {"name" => "karen", "points" => 500, "transactions" => []}
dipti = {"name" => "dipti", "points" => 700, "transactions" => []}
 
# preload initial documents
cb.set("karen", karen)
cb.set("dipti", dipti)
 
# prepare transaction document
trans = {"source" => "karen", "destination" => "dipti", "amount" => 100, "state" => "initial"}
cb.set("trans:1", trans)
 
begin
# STEP 1: Switch transaction into pending state
cb.cas("trans:1") do
trans.update("state" => "pending")
end
 
# STEP 2: Apply transaction to both documents
cb.cas("karen") do |val|
val.update("points" => val["points"] - 100,
"transactions" => val["transactions"] + ["trans:1"])
end
cb.cas("dipti") do |val|
val.update("points" => val["points"] + 100,
"transactions" => val["transactions"] + ["trans:1"])
end
 
# STEP 3: Switch transaction into committed state
cb.cas("trans:1") do |val|
val.update("state" => "committed")
end
 
# STEP 4: Remove transaction from the documents
cb.cas("karen") do |val|
val.update("transactions" => val["transactions"] - ["trans:1"])
end
cb.cas("dipti") do |val|
val.update("transactions" => val["transactions"] - ["trans:1"])
end
 
# STEP 5: Switch transaction into done state
cb.cas("trans:1") do |val|
val.update("state" => "done")
end
rescue Couchbase::Error::Base => ex
# Rollback transaction
trans = cb.get("trans:1")
case trans["state"]
when "committed"
# Create new transaction and swap the targets or amount sign.
#
# The code block about could be wrapped in the method something like
#
# def transfer(source, destination, amount)
# ...
# end
#
# So that this handler could just re-use it.
when "pending"
# STEP 1: Switch transaction into cancelling state
cb.cas("trans:1") do |val|
val.update("state" => "cancelling")
end
 
# STEP 2: Revert changes if they were applied
cb.cas("karen") do |val|
break unless val["transactions"].include?("trans:1")
val.update("points" => val["points"] + 100,
"transactions" => val["transactions"] - ["trans:1"])
end
cb.cas("dipti") do |val|
break unless val["transactions"].include?("trans:1")
val.update("points" => val["points"] - 100,
"transactions" => val["transactions"] - ["trans:1"])
end
 
# STEP 3: Switch transaction into cancelled state
cb.cas("trans:1") do |val|
val.update("state" => "cancelled")
end
end
 
# Re-raise original exception
raise ex
end

I'm considering that between line 52 trans = cb.get("trans:1") and further transaction updates the transaction document won't change. More robust version will be

trans, flags, cas = cb.get("trans:1", :extended => true)
case trans["state"]
when "committed"
  # ...
when "pending"
  cb.set("trans:1", trans.update("state" => "cancelling"), :flags => flags, :cas => cas)
  # ...
end

Becase #cas is #get + #set

Another point, I found that Bucket#cas operation doesn't replicate flags from original key. http://www.couchbase.com/issues/browse/RCBC-59

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.