Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
WIP port of Ripple's Riak::TestServer to riak-js
$ node ts.js
10 Jan 15:01:23 - prepared
10 Jan 15:01:24 - started
PUT /riak/airlines/KLM
10 Jan 15:01:24 - saved
10 Jan 15:01:24 - cleared
GET /riak/airlines/KLM
10 Jan 15:01:24 - not found! it works
sys = require 'sys'
{spawn} = require 'child_process'
fs = require 'fs'
path = require 'path'
EventEmitter = require('events').EventEmitter
Utils = require './utils'
erlangPath = path.normalize("#{__dirname}/../erl_src")
tempPath = path.normalize("#{process.cwd()}/.riaktest")
# Ported from the Ruby riak-client
class TestServer extends EventEmitter
@defaults =
web_ip: ""
web_port: 9000
handoff_port: 9001
ring_creation_size: 64
storage_backend: {atom: "riak_kv_test_backend"}
pb_ip: ""
pb_port: 9002
js_vm_count: 8
js_max_vm_mem: 8
js_thread_stack: 16
riak_kv_stat: true
enabled: false
errlog_type: {atom: "error"}
"-name": "riaktest#{Math.floor(Math.random()*100000000000)}@"
"-setcookie": "riak-js-test"
"+K": true
"+A": 64
"-smp": "enable"
"-env ERL_MAX_PORTS": 4096
"-pa": erlangPath
tempDir: tempPath
constructor: (options) ->
@options = Utils.mixin true, {}, TestServer.defaults, options
@options.appConfig.riak_core.ring_state_dir = "#{@options.tempDir}/data/ring"
@options.binDir = path.normalize(@options.binDir)
@erlangPrompt = new RegExp("^.#{@options.vmArgs['-name']}.\\d+>", "m")
prepare: (callback) ->
unless @prepared?
@createTempDirectories =>
@riakScript = "#{@temp_bin}/riak"
@writeRiakScript =>
@writeVmArgs =>
@writeAppConfig =>
@prepared = true
callback() if callback
start: (callback) ->
if @prepared and not @started and @listeners('erlangPrompt').length is 0
setStarted = =>
@started = true
callback() if callback
@once 'erlangPrompt', setStarted
@console = spawn(@riakScript, ["console"])
@console.on 'exit', @registerStop
# do the work of what we get from expect() in Ruby
@console.stdout.on 'data', (data) =>
unless is -1
if @options.debug
@console.stderr.on 'data', sys.debug
@console.stdout.on 'data', sys.debug
process.on 'exit', =>
stop: ->
if @started and @listeners('erlangPrompt').length is 0
@console.stdin.write("init:stop().\n", "ascii")
clear: (callback) ->
if @started and @listeners('erlangPrompt').length is 0
setStarted = =>
@started = true
callback() if callback
sendReset = =>
@once 'erlangPrompt', setStarted
@started = false
@console.stdin.write("riak_kv_test_backend:reset().\n", "ascii")
@once 'erlangPrompt', sendReset
@console.stdin.write("ok.\n", "ascii")
registerStop: ->
delete @console
@started = false
createTempDirectories: (callback) ->
subdirs = for dir in ['bin', 'etc', 'log', 'data', 'data/ring', 'pipe']
this["temp_#{dir}"] = path.normalize("#{@options.tempDir}/#{dir}")
subdirs.unshift @options.tempDir
createDir = =>
if subdirs.length is 0
currDir = subdirs.shift()
fs.mkdir currDir, 0700, createDir
rmrf = spawn("rm", ["-rf", @options.tempDir])
rmrf.on 'exit', createDir
writeRiakScript: (callback) ->
outScript = fs.createWriteStream @riakScript, {encoding: 'utf8', mode: 0700}
inScript = fs.createReadStream "#{@options.binDir}/riak", encoding: 'utf8'
inScript.on 'error', (err) ->
sys.debug "error reading from #{inScript.path}:\n#{sys.inspect(err, true, null)}"
throw err
outScript.on 'error', (err) ->
sys.debug "error writing to #{outScript.path} script:\n#{sys.inspect(err, true, null)}"
throw err
outScript.on 'drain', -> inScript.resume()
inScript.on 'data', (data) =>
data = data.toString('utf8') if Buffer.isBuffer(data)
data = data.replace(/(RUNNER_SCRIPT_DIR=)(.*)$/m, "$1#{@temp_bin}")
data = data.replace(/(RUNNER_ETC_DIR=)(.*)$/m, "$1#{@temp_etc}")
data = data.replace(/(RUNNER_USER=)(.*)$/m, "$1")
data = data.replace(/(RUNNER_LOG_DIR=)(.*)$/m, "$1#{@temp_log}")
data = data.replace(/(PIPE_DIR=)(.*)$/m, "$1#{@temp_pipe}")
data = data.replace("RUNNER_BASE_DIR=${RUNNER_SCRIPT_DIR%/*}", "RUNNER_BASE_DIR=#{path.normalize(@options.binDir + '/..')}")
outScript.write data
inScript.on 'end', ->
callback() if callback
writeVmArgs: (callback) ->
vmArgs = for own option, value of @options.vmArgs
"#{option} #{value}"
vmArgs = vmArgs.join("\n")
fs.writeFile("#{@temp_etc}/vm.args", vmArgs, callback)
writeAppConfig: (callback) ->
appConfig = @toErlangConfig(@options.appConfig) + "."
fs.writeFile("#{@temp_etc}/app.config", appConfig, callback)
# Converts an object into app.config-compatible Erlang terms
toErlangConfig: (object, depth = 1) ->
padding = (' ' for num in [1..depth]).join ""
parentPadding = if depth <= 1
(' ' for num in [1..(depth-1)]).join ""
values = for own key, value of object
if value.atom?
printable = value.atom
else if typeof value is 'string'
printable = "\"#{value}\""
else if value instanceof Object
printable = @toErlangConfig(value, depth+1)
printable = value.toString()
"{#{key}, #{printable}}"
values = values.join(",\n#{padding}")
# Node v0.2.6 doesn't have EventEmitter.once
once: (type, listener) ->
callback = =>
@removeListener(type, callback)
listener.apply(this, arguments)
@on type, callback
module.exports = TestServer
var sys = require('sys')
var TestServer = require("./lib/test_server")
var ts = new TestServer({binDir: "/Users/sean/Development/riak/rel/riak/bin", debug: false})
process.on("exit", function(){
var db = require('riak-js').getClient({port: 9000})'airlines', 'KLM',
{fleet: 111, country: 'NL'},
{ links:
[{ bucket: 'flights', key: 'KLM-8098', tag: 'cargo' },
{ bucket: 'flights', key: 'KLM-1196', tag: 'passenger' }]
}, function(err){
if(err) throw err;
db.get("airlines", "KLM", function(err){
if(err && err.notFound){
sys.log("not found! it works")
Copy link

technoweenie commented Jan 7, 2011

  • You can replace () -> with just ->
  • Function calls with no arguments need the parens (see prepare())
  • does @defaults: at the top need to be @defaults = ?

Copy link

seancribbs commented Jan 7, 2011

@technoweenie thanks

Copy link

seancribbs commented Jan 7, 2011

Wrapping my head around the lack of blocking calls hurts. Is my cascade of event listeners too crazy?

Copy link

frank06 commented Jan 8, 2011

From what I see, and assuming it actually works :), that code is very good – but you probably already know that. Proper usage of coffeescript and node idioms. I think I'd go about the same way dealing with erlangPrompt (and you're taking good care of removing listeners where appropriate)... in any case shouldn't be too far off. Not sure what you mean by cascade of event listeners; the usage of lower-level spawn and read/write stream? that's just how it works so nothing out of normal there.

Copy link

seancribbs commented Jan 9, 2011

@frank06 I meant that I'm simulating blocking operations by adding/removing listeners and avoiding critical sections by checking the listener list. Seems hackish to me...

Still, thanks for your comments. It's almost working, will try to finish Monday.

Copy link

indexzero commented Jan 10, 2011

Any sample usage I could help to wrap my head around this with? Never really used coffeescript much except for reading / understanding other people's libraries >.<

Seems like a lot of your event bookkeeping could be simplified by using EventEmitter.once instead of using EventEmitter.on and removing the listeners yourself. Interesting approach, I'll let you know if I see any other optimizations.

Copy link

seancribbs commented Jan 10, 2011

@indexzero Thanks for the tip about once, as you can see I'm a node newb.

The idea of the code is to run Riak as a subprocess with an in-memory backend that can be easily cleared at the end of a test-suite, either after each example or after a section of examples. It's a port of

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment