public
Last active

Simple Couchbase PHP 2PC Implementation

  • Download Gist
couchbase-php-2pc-simple.php
PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
<?php
/**
* Simple two-phase commit for PHP couchbase.
*
* Michael Nitschinger (@daschl, 2012)
*
* Additional Remarks
* ------------------
* - The Couchbase extension makes it currently pretty hard to write easy readable code when dealing with
* CAS (compared to the ruby adapter). This could certainly be improved with closures. I also found that
* only the getDelayed method seems to return the CAS values (and the get callback doesnt seem to work
* either.
* - In the current implementation there should also be a check to see if a transaction is currently running
* on both source or destination (or some notes should be placed to do that inside userland). This is
* not so much a case in this transfer method but would be more an issue in a general transaction method
* where every field could be modified.
* - Also, since the SDK doesn't raise an exception when a CAS is not valid, you have to do that on your own
* always. This of course makes it more verbose too.
* - I also found that $cb->increment doesnt create the key if it isnt found (this should be corrected in the
* docs or just be implemented). Or maqybe with an optional 'create' => true as in the ruby sdk.
* - You can easily test this implementation by raising a TransactionException somewhere in the try block.
*/
class TransactionException extends RuntimeException {}
 
 
function transfer($source, $destination, $amount, &$cb) {
 
// helper closure to return the cas and other values more easily
$get = function($key, $casOnly = false) use (&$cb) {
$return = null;
$cb->getDelayed(array($key), true, function($cb, $data) use(&$return, $casOnly) {
$return = $casOnly ? $data['cas'] : array(json_decode($data['value'], true), $data['cas']);
});
return $return;
};
 
// prepare transaction document
if($cb->get('transaction:counter') === null) {
$cb->set('transaction:counter', 0);
}
$id = $cb->increment('transaction:counter', 1);
 
$state = 'initial';
$transKey = "transaction:$id";
$transDoc = compact('source', 'destination', 'amount', 'state');
$cb->set($transKey, json_encode($transDoc));
$transactionCas = $get($transKey, true);
 
// if the transaction document couldnt be stored, return an exception.
if(!$transactionCas) {
throw new TransactionException("Could not insert transaction document");
}
 
try {
// STEP 1: Switch transaction into pending state
$transDoc['state'] = 'pending';
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch to pending state");
}
// STEP 2: Apply transaction to both documents
list($sourceDoc, $sourceCas) = $get($source);
list($destDoc, $destCas) = $get($destination);
 
$sourceDoc['points'] -= $amount;
$sourceDoc['transactions'] += array($transKey);
$destDoc['points'] += $amount;
$destDoc['transactions'] += array($transKey);
 
if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) {
throw new TransactionException("Could not update source document");
}
 
if(!$cb->cas($destCas, $destination, json_encode($destDoc))) {
throw new TransactionException("Could not update destination document");
}
 
// STEP 3: Switch transactions into commited state
$transDoc['state'] = 'committed';
$transactionCas = $get($transKey, true);
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch to committed state");
}
 
// STEP 4: Remove transaction from documents
list($sourceDoc, $sourceCas) = $get($source);
list($destDoc, $destCas) = $get($destination);
 
$sourceDoc['transactions'] = array_diff($sourceDoc['transactions'], array($transKey));
$destDoc['transactions'] = array_diff($destDoc['transactions'], array($transKey));
 
if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) {
throw new TransactionException("Could not remove transaction from source document");
}
 
if(!$cb->cas($destCas, $destination, json_encode($destDoc))) {
throw new TransactionException("Could not remove transaction from destination document");
}
 
// STEP 5: Switch transaction into done state
$transDoc['state'] = 'done';
$transactionCas = $get($transKey, true);
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch to done state");
}
 
} catch(Exception $e) {
// Rollback transaction
list($transDoc, $transCas) = $get($transKey);
 
switch($transDoc['state']) {
case 'committed':
// create new transaction and swap the targets
transfer($destination, $source, $amount, $cb);
break;
case 'pending':
// STEP 1: switch transaction into cancelling state
$transDoc['state'] = 'cancelling';
$transactionCas = $get($transKey, true);
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch into cancelling state");
}
// STEP 2: revert changes if applied
list($sourceDoc, $sourceCas) = $get($source);
list($destDoc, $destCas) = $get($destination);
 
if(in_array($transKey, $sourceDoc['transactions'])) {
$sourceDoc['points'] += $amount;
$sourceDoc['transactions'] = array_diff($sourceDoc['transactions'], array($transKey));
if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) {
throw new TransactionException("Could not revert source document");
}
}
 
if(in_array($transKey, $destDoc['transactions'])) {
$destDoc['points'] -= $amount;
$destDoc['transactions'] = array_diff($destDoc['transactions'], array($transKey));
if(!$cb->cas($destCas, $destination, json_encode($destDoc))) {
throw new TransactionException("Could not revert destination document");
}
}
 
// STEP 3: switch transaction into cancelled state
$transDoc['state'] = 'cancelled';
$transactionCas = $get($transKey, true);
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch into cancelled state");
}
break;
}
// Rethrow the original exception
throw new Exception("Transaction failed, rollback executed", null, $e);
}
}
 
$cb = new Couchbase('localhost:8091');
 
$cb->set('karen', json_encode(array(
'name' => 'karen',
'points' => 500,
'transactions' => array()
)));
$cb->set('dipti', json_encode(array(
'name' => 'dipti',
'points' => 700,
'transactions' => array()
)));
 
transfer('karen', 'dipti', 100, $cb);
 
?>

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.