Created
July 18, 2012 12:48
-
-
Save avsej/3136027 to your computer and use it in GitHub Desktop.
Two phase commit for Couchbase. Version 2 (improved version of https://gist.github.com/3135796)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'couchbase' | |
def transfer(source, destination, amount) | |
cb = Couchbase.bucket | |
# prepare transaction document | |
id = cb.incr("transaction:counter", :create => true) | |
trans_id = "transaction:#{id}" | |
cb.set(trans_id, {"source" => source, "destination" => destination, | |
"amount" => amount, "state" => "initial"}) | |
# STEP 1: Switch transaction into pending state | |
cb.cas(trans_id) do |val| | |
val.update("state" => "pending") | |
end | |
# STEP 2: Apply transaction to both documents | |
cb.cas(source) do |val| | |
val.update("points" => val["points"] - amount, | |
"transactions" => val["transactions"] + [trans_id]) | |
end | |
cb.cas(destination) do |val| | |
val.update("points" => val["points"] + amount, | |
"transactions" => val["transactions"] + [trans_id]) | |
end | |
# STEP 3: Switch transaction into committed state | |
cb.cas(trans_id) do |val| | |
val.update("state" => "committed") | |
end | |
# STEP 4: Remove transaction from the documents | |
cb.cas(source) do |val| | |
val.update("transactions" => val["transactions"] - [trans_id]) | |
end | |
cb.cas(destination) do |val| | |
val.update("transactions" => val["transactions"] - [trans_id]) | |
end | |
# STEP 5: Switch transaction into done state | |
cb.cas(trans_id) do |val| | |
val.update("state" => "done") | |
end | |
rescue Couchbase::Error::Base => ex | |
# Rollback transaction | |
trans, flags, cas = cb.get(trans_id, :extended => true) | |
case trans["state"] | |
when "committed" | |
# Create new transaction and swap the targets | |
transfer(destination, source, amount) | |
when "pending" | |
# STEP 1: Switch transaction into cancelling state | |
cb.set(trans_id, val.update("state" => "cancelling"), | |
:flags => flags, :cas => cas) | |
# STEP 2: Revert changes if they were applied | |
cb.cas(source) do |val| | |
break unless val["transactions"].include?(trans_id) | |
val.update("points" => val["points"] + 100, | |
"transactions" => val["transactions"] - [trans_id]) | |
end | |
cb.cas(destination) do |val| | |
break unless val["transactions"].include?(trans_id) | |
val.update("points" => val["points"] - 100, | |
"transactions" => val["transactions"] - [trans_id]) | |
end | |
# STEP 3: Switch transaction into cancelled state | |
cb.cas(trans_id) do |val| | |
val.update("state" => "cancelled") | |
end | |
end | |
# Re-raise original exception | |
raise ex | |
end | |
cb = Couchbase.bucket | |
# preload initial documents | |
cb.set("karen", {"name" => "karen", "points" => 500, "transactions" => []}) | |
cb.set("dipti", {"name" => "dipti", "points" => 700, "transactions" => []}) | |
transfer("karen", "dipti", 100) |
Is there a equivalent example in Java?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Is there a equivalent example in Java?