Skip to content

Instantly share code, notes, and snippets.

@daschl
Created July 21, 2012 12:57
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save daschl/3155762 to your computer and use it in GitHub Desktop.
Save daschl/3155762 to your computer and use it in GitHub Desktop.
Advanced Couchbase 2PC PHP Implementation
<?php
/**
* A more general PHP two-phase commit implementation.
*
* This assumes that we run on 5.3 or later.
*/
class TransactionException extends Exception {}
/**
* Holds the loaded document and makes it easier to work
* with bot CAS and payload.
*/
class CouchbaseDocument {
/**
* The key of the document.
*/
public $key = null;
/**
* The cas value of the document.
*/
public $cas = null;
/**
* The actual data of the document.
*/
public $values = null;
/**
* Set all values on init.
*/
public function __construct($data) {
if(isset($data['key'])) {
$this->key = $data['key'];
}
if(isset($data['cas'])) {
$this->cas = $data['cas'];
}
if(isset($data['value'])) {
if(is_string($data['value'])) {
$this->values = json_decode($data['value'], true);
} else {
$this->values = $data['value'];
}
}
}
}
/**
* This class allows you to do transactions based on two-phase commits.
*
* You can pass in an array of documents and the SDK makes sure to either
* store them correctly or return an exception that you can catch and
* act upon.
*
* It also adds some convenience methods that make the actual implementation
* easier to read.
*/
class MyCouchbase extends Couchbase {
/**
* Holds a list of documents that should be deleted on rollback.
*/
protected $_deleteOnRollback = array();
/**
* Holds the pre-transaction state of the documents that have been change.
*/
protected $_tmpCache = array();
/**
* Implements the commit logic with a more concise syntax than the
* simple example.
*
* @param array $docs An array of documents with the key as the key
* and a json_encodable value as the document.
* @param array $options An array of options that can be passed on.
* - `full`: If set to true, it assumes that the passed
* docs represent the full payload and not just
* a subset. Defaults to false and will be merged.
* - `create`: If true, the documents will be created
* if they don't exist (but all together). Defaults
* to true.
*/
public function commit($docs, $options = array()) {
$defaults = array('full' => false, 'create' => true);
$config = $options + $defaults;
if(empty($docs)) {
return true;
}
// STEP 0: Convert array do CouchbaseDocument for easier handling
foreach($docs as $key => $document) {
if(is_array($document)) {
$docs[$key] = new CouchbaseDocument(array(
'key' => $key,
'value' => $document
));
}
}
// STEP 1: Insert Transaction Document
$transactionId = $this->increment('transaction:counter', 1, true);
$transactionKey = "transaction:${transactionId}";
$transactionDocument = array(
'id' => $transactionId,
'docs' => array_keys($docs),
'state' => 'initial'
);
$this->set($transactionKey, json_encode($transactionDocument));
$transaction = $this->_get($transactionKey);
if(!$transaction) {
throw new TransactionException("Could not insert transaction document");
}
try {
// STEP 1.1: Put Transaction in pending mode
$transaction->values['state'] = 'pending';
$this->cas($transaction->cas, $transaction->key, json_encode($transaction->values));
$transaction = $this->_get($transaction->key);
// STEP 2: Insert documents if configured to do so.
$this->_deleteOnRollback[$transactionKey] = array();
if($config['create']) {
foreach($docs as $key => $document) {
if($this->get($key) === null) {
$document->values['transactions'] = array($transaction->key);
$this->set($key, json_encode($document->values));
$this->_deleteOnRollback[$transactionKey][] = $key;
}
}
}
// STEP 3: Read all current documents
$current = array();
foreach(array_keys($docs) as $key) {
$result = $this->_get($key);
if(!$result) {
throw new TransactionException("Could not read document before update.");
}
$current[$key] = $result;
$this->_tmpBuffer[$transactionKey][$key] = $result;
}
if(count($current) != count($docs)) {
throw new TransactionException("Document count mismatch.");
}
// STEP 4: Update/Merge them via CAS
foreach($docs as $key => $document) {
if($config['full'] === true) {
$current[$key]->values = $document->values;
} else {
$current[$key]->values = $document->values + $current[$key]->values;
}
if(!isset($current[$key]->values['transactions'])) {
$current[$key]->values['transactions'] = array();
}
$current[$key]->values['transactions'] += array($transaction->key);
if(!$this->cas($current[$key]->cas, $current[$key]->key, json_encode($current[$key]->values))) {
throw new TransactionException("Could not update the document.");
}
}
// STEP 5: Switch transaction to commited state
$transaction->values['state'] = 'committed';
$this->cas($transaction->cas, $transaction->key, json_encode($transaction->values));
$transaction = $this->_get($transaction->key);
// STEP 6: Remove transaction from documents
foreach(array_keys($docs) as $key) {
$document = $this->_get($key);
$document->values['transactions'] = array_diff($document->values['transactions'], array($transactionKey));
if(!$this->cas($document->cas, $document->key, json_encode($document->values))) {
throw new TransactionException("Could not remove transaction from document.");
}
}
//throw new TransactionException("*** TEST EXCEPTION ***");
// STEP 7: Put transaction state to "done"
$transaction->values['state'] = 'done';
$this->cas($transaction->cas, $transaction->key, json_encode($transaction->values));
$transaction = $this->_get($transaction->key);
} catch(Exception $e) {
// Read current transaction state
$transaction = $this->_get($transactionKey);
$state = $transaction->values['state'];
// STEP 2: Do revert tasks depending on the state
if($state == 'pending' || $state == 'committed') {
// set transaction state to "cancelling"
$transaction->values['state'] = 'cancelling';
$this->cas($transaction->cas, $transaction->key, json_encode($transaction->values));
$transaction = $this->_get($transaction->key);
foreach($this->_tmpBuffer[$transactionKey] as $key => $document) {
$this->set($document->key, json_encode($document->values));
}
foreach($this->_deleteOnRollback[$transactionKey] as $key) {
$this->delete($key);
}
}
// set transaction state to "cancelled"
$transaction->values['state'] = 'cancelled';
$this->cas($transaction->cas, $transaction->key, json_encode($transaction->values));
$transaction = $this->_get($transaction->key);
throw new TransactionException("Transaction failed, recovered state.", null, $e);
}
}
/**
* Helper method to return the data needed and populate it with CouchbaseDocument
* objects for easer management.
*/
protected function _get($key) {
$return = null;
$this->getDelayed(array($key), true, function($conn, $data) use(&$return) {
$return = new CouchbaseDocument($data);
});
return $return;
}
/**
* Overrides the initial implementation to provide an additional third
* param to create the document if it doesn't exist yet.
*/
public function increment($key, $offset, $create = false) {
if($create) {
if($this->get($key) === null) {
$this->set($key, 0);
}
}
return parent::increment($key, $offset);
}
}
?>
<?php
$cb = new MyCouchbase('localhost:8091');
$docs = array(
'michael' => array('balance' => 800),
'matt' => array('balance' => 80)
);
$cb->commit($docs);
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment