public
Last active

Two phase commit for Couchbase. Version 2 (improved version of https://gist.github.com/3135796)

  • Download Gist
test.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
require 'rubygems'
require 'couchbase'
 
def transfer(source, destination, amount)
cb = Couchbase.bucket
 
# prepare transaction document
id = cb.incr("transaction:counter", :create => true)
trans_id = "transaction:#{id}"
cb.set(trans_id, {"source" => source, "destination" => destination,
"amount" => amount, "state" => "initial"})
 
# STEP 1: Switch transaction into pending state
cb.cas(trans_id) do |val|
val.update("state" => "pending")
end
 
# STEP 2: Apply transaction to both documents
cb.cas(source) do |val|
val.update("points" => val["points"] - amount,
"transactions" => val["transactions"] + [trans_id])
end
cb.cas(destination) do |val|
val.update("points" => val["points"] + amount,
"transactions" => val["transactions"] + [trans_id])
end
 
# STEP 3: Switch transaction into committed state
cb.cas(trans_id) do |val|
val.update("state" => "committed")
end
 
# STEP 4: Remove transaction from the documents
cb.cas(source) do |val|
val.update("transactions" => val["transactions"] - [trans_id])
end
cb.cas(destination) do |val|
val.update("transactions" => val["transactions"] - [trans_id])
end
 
# STEP 5: Switch transaction into done state
cb.cas(trans_id) do |val|
val.update("state" => "done")
end
rescue Couchbase::Error::Base => ex
# Rollback transaction
trans, flags, cas = cb.get(trans_id, :extended => true)
case trans["state"]
when "committed"
# Create new transaction and swap the targets
transfer(destination, source, amount)
when "pending"
# STEP 1: Switch transaction into cancelling state
cb.set(trans_id, val.update("state" => "cancelling"),
:flags => flags, :cas => cas)
 
# STEP 2: Revert changes if they were applied
cb.cas(source) do |val|
break unless val["transactions"].include?(trans_id)
val.update("points" => val["points"] + 100,
"transactions" => val["transactions"] - [trans_id])
end
cb.cas(destination) do |val|
break unless val["transactions"].include?(trans_id)
val.update("points" => val["points"] - 100,
"transactions" => val["transactions"] - [trans_id])
end
 
# STEP 3: Switch transaction into cancelled state
cb.cas(trans_id) do |val|
val.update("state" => "cancelled")
end
end
 
# Re-raise original exception
raise ex
end
 
cb = Couchbase.bucket
 
# preload initial documents
cb.set("karen", {"name" => "karen", "points" => 500, "transactions" => []})
cb.set("dipti", {"name" => "dipti", "points" => 700, "transactions" => []})
 
transfer("karen", "dipti", 100)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.