Skip to content

Instantly share code, notes, and snippets.

@nathanaschbacher
Last active December 14, 2015 07:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nathanaschbacher/5052226 to your computer and use it in GitHub Desktop.
Save nathanaschbacher/5052226 to your computer and use it in GitHub Desktop.
One simple example of an implementation for a transaction/audit journal and restoring lost/deleted objects for a known key.
var riak = require('nodiak').getClient('http', '127.0.0.1', '10018');
var versionedSave = function versionedSave(robj, callback) {
// get the object currently stored as the HEAD.
riak.bucket('head').object.get(robj.key, function(err, head_robj) {
// immediately call callback on any error other than 404.
if(err && err.status_code !== 404) callback(err);
// if object not found, just save the object.
else if(err.status_code === 404) {
robj.data.manifest = [];
robj.save(callback);
}
// else save a version of the current object and then save the new object with an updated manifest.
else {
var version_id = generateVersionIdentifier();
writeVersion(head_robj, version_id, function(err, result) {
robj.setMeta('manifest', head_robj.getMeta('manifest') )
robj.setMeta('manifest', robj.getMeta('manifest') + ',' + version_id);
robj.save(callback);
});
}
});
};
var versionedDelete = function versionedDelete(key, callback) {
// get an instance of an empty object.
var robj = riak.bucket('head').object.new(key, {});
// set a tombstone in case setting empty data to the object is valid.
robj.setMeta('tombstone', true);
// do a versioned save to keep the manifest intact.
versionedSave(robj, callback);
};
var getVersions = function getVersions(key, limit_to, callback) {
// get the object by key.
riak.bucket('head').object.get(key, function(err, robj) {
// immediately call callback on any error.
if(err) callback(err);
else {
// parse the manifest list from the object metadata.
limit_to = limit_to === null || 0 ? undefined : limit_to;
var versions = robj.getMeta('manifest').split(',', limit_to);
// prepend key to each version id.
for(var i = 0, length = versions.length; i < length; i++) {
versions[i] = key + '_' + versions[i];
}
// bulk get all version RObjects and send through callback.
riak.bucket('versions').objects.get(versions, callback);
}
});
};
var restoreVersion = function restoreVersion(robj, callback) {
// split the base key off the version key.
robj.key = robj.key.split('_', 1);
// swap object's bucket reference to point to head bucket.
robj.bucket = riak.bucket('head');
// do a versioned save of the version to restore so that we keep the manifest consistent after restore.
versionedSave(robj, callback);
};
var writeVersion = function writeVersion(robj, version_id, callback) {
var temp = obj;
// append version to key.
temp.key = temp.key + '_' + version_id;
// swap object's bucket reference to point to versions bucket.
temp.bucket = riak.bucket('versions');
// save object.
temp.save(callback);
};
var updateManifest = function updateManifest(manifest, item) {
// if manifest is empty just set it to the item.
if(manifest === null || manifest === undefined || manifest == '') {
manifest = item;
}
else { // put a comma in front for splitting on later.
manifest += ',' + item;
}
return manifest;
};
var generateVersionIdentifier = function generateVersionIdentifier() {
// simply returns the ISO standard formatted 'now' as a timestamp.
return new Date().toISOString();
};

##Driver

Customers often request “versioning” or “restore/undo” functionality, as a means of data recovery from user error (as opposed to recovery from hardware failure or data loss errors, at which Riak already excels). If an object is changed or deleted by the end user, but later needs to be restored, is there a way to either look at a transaction log to undo the damage, or to explicitly restore the object to a previous version?

##Overview

This solution provides one simple example of an implementation for a transaction/audit journal and restoring lost/deleted objects for a known key. A separate global transaction/audit journal could also be built to provide an index to get an object key in the case where you don't know it beforehand, but that use case is outside the scope of this document.

The algorithm used here depends only on Riak's key/value functionality for two reasons. First, to show that a solution to the problem can be simply and efficiently implemented with a key/value pattern. Second, relying on KV maintains Riak's horizontal scaling profile wherein if you want to increase performance and/or capacity, you just add more nodes to your cluster.

We'll use two separate buckets to organize the HEAD (current version) and previous VERSIONS. A manifest stored on the objects as an Array will be used to maintain an index for the history of versions stored in the VERSIONS bucket. The object in HEAD will always contain the complete manifest representing the full journal of operations on the object, and each versioned object will contain the manifest representing the full journal of operations on the object at the time the version was created.

There are variety of changes one could make to the process described here to do things like cull the manifest from the versioned objects, offload the manifest to its own object (at the expense of an additional read/write), compact the manifest via whatever suitable criteria fits the use case, improve the detail of the manifest by also logging the type of operation (delete, update, insert), etc.

You could also increase the concurrency of the asynchronous control-flow in the example code to improve performance if you aren't concerned with guaranteeing a certain order of operations, eg. you could save the versioned and new objects in parallel rather than serially.

##Details

#####Dependencies

The example code is written in Javascript and depends on node.js and the nodiak Riak client library.

npm install nodiak

#####Setup

Make sure to adjust your .getClient() call to match your environment.

var riak = require('nodiak').getClient('http', '127.0.0.1', '10018');

#####INSERT/UPDATE

  1. if object exists in HEAD:
    1. generate version id.
    2. append version id to pre-existing object's key.
    3. write current HEAD object to VERSIONS bucket under updated key.
    4. copy manifest from pre-existing object to the new object.
    5. add the generated version id to the manifest.
    6. write new object to HEAD bucket.
  2. if object doesn't exist:
    1. write object to HEAD bucket w/ an empty manifest.
var versionedSave = function versionedSave(robj, callback) {
  // get the object currently stored as the HEAD.
  riak.bucket('head').object.get(robj.key, function(err, head_robj) {
    // immediately call callback on any error other than 404.
    if(err && err.status_code !== 404) callback(err);
    // if object not found, create an empty manifest and save the object.
    else if(err.status_code === 404) {
      robj.save(callback);
    }
    // else save a version of the current object and then save the new object with an updated manifest.
    else {
      var version_id = generateVersionIdentifier();
      writeVersion(head_robj, version_id, function(err, result) {
        robj.setMeta('manifest', head_robj.getMeta('manifest') )
        robj.setMeta('manifest', robj.getMeta('manifest') + ',' + version_id);
        robj.save(callback);
      });
    }
  });
};

#####DELETE

  1. get an empty object.
  2. set a tombstone on the empty object.
  3. do a versioned INSERT/UPDATE of the empty object into HEAD.
var versionedDelete = function versionedDelete(key, callback) {
  // get an instance of an empty object.
  var robj = riak.bucket('head').object.new(key, {});
  
  // set a tombstone in case setting empty data to the object is valid.
  robj.setMeta('tombstone', true);

  // do a versioned save to keep the manifest intact.
  versionedSave(robj, callback);
};

#####GET VERSIONS

  1. get the head object.
  2. read the version manifest from the object.
  3. get versioned objects (w/ optional limit set for number previous versions to get).
var getVersions = function getVersions(key, limit_to, callback) {
  // get the object by key.
  riak.bucket('head').object.get(key, function(err, robj) {
    // immediately call callback on any error.
    if(err) callback(err);
    else {
      // parse the manifest list from the object metadata.
      limit_to = limit_to === null || 0 ? undefined : limit_to;
      var versions = robj.getMeta('manifest').split(',', limit_to);

      // prepend key to each version id.
      for(var i = 0, length = versions.length; i < length; i++) {
        versions[i] = key + '_' + versions[i];
      }

      // bulk get all version RObjects and send through callback.
      riak.bucket('versions').objects.get(versions, callback);
    }
  });
};

#####RESTORE

  1. do a versioned INSERT/UPDATE of the version you want to become head.
var restoreVersion = function restoreVersion(robj, callback) {
    // split the base key off the version key.
    robj.key = robj.key.split('_', 1);
    
    // swap object's bucket reference to point to head bucket. 
    robj.bucket = riak.bucket('head');
    
    // do a versioned save of the previously versioned object to keep the manifest consistent.
    versionedSave(robj, callback);
};

#####Auxiliary Functions

var writeVersion = function writeVersion(robj, version_id, callback) {
  var temp = obj;
  
  // append version to key.    
  temp.key = temp.key + '_' + version_id;
  
  // swap object's bucket reference to point to versions bucket.
  temp.bucket = riak.bucket('versions');
  
  // save object.
  temp.save(callback);
};

var updateManifest = function updateManifest(manifest, item) {
  // if manifest is empty just set it to the item.
  if(manifest === null || manifest === undefined || manifest == '') {
    manifest = item;
  }
  else { // put a comma in front for splitting on later.
    manifest += ',' + item;
  }
  return manifest;
};

var generateVersionIdentifier = function generateVersionIdentifier() {
  // simply returns the ISO standard formatted 'now' as a timestamp.
  return new Date().toISOString();
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment