Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
very quick riak protobuf example in node.js
sys: require 'sys'
net: require 'net'
fs: require 'fs'
Buffer: require('buffer').Buffer
Schema: require('protobuf_for_node').Schema
schema: new Schema(fs.readFileSync('riak.desc'))
types: ['RpbErrorResp', 'RpbPingReq', 'RpbPingResp', 'RpbGetClientIdReq',
'RpbGetClientIdResp', 'RpbSetClientIdReq', 'RpbSetClientIdResp',
'RpbGetServerInfoReq', 'RpbGetServerInfoResp', 'RpbGetReq', 'RpbGetResp']
messages: {}
# loads a schema, assigns a riak code based on it's array position
load_schema: (name) ->
code: types.indexOf name
sch: schema["riak.$name"]
sch.riak_code: code
messages[code]: sch
sch
Riak: {
GetRequest: load_schema 'RpbGetReq'
GetResponse: load_schema 'RpbGetResp'
messages: messages
}
# sends protobuf bytes to riak
class Riak.Connection
constructor: (port, host) ->
@callbacks: []
@conn: net.createConnection port, host
riak: this
@conn.addListener 'data', (chunk) ->
if msg: riak.message.receive chunk
if cb: riak.callbacks.shift()
cb msg
riak.conn.emit 'message', msg
riak.reset()
@reset()
reset: ->
@message: new Riak.Message(@conn)
send: (type, data, callback) ->
@callbacks.push callback
@conn.write @prepare(type, data)
on: (name, cb) ->
@conn.addListener name, cb
end: ->
@conn.end()
prepare: (type, data) ->
buf: type.serialize data
len: buf.length + 1
msg: new Buffer(len + 4)
msg[0]: len >>> 24
msg[1]: len >>> 16
msg[2]: len >>> 8
msg[3]: len & 255
msg[4]: type.riak_code
buf.copy msg, 5, 0
msg
# Reads buffers from riak and parses them.
class Riak.Message
constructor: (conn) ->
@conn: conn
@type: null # the protobuf response type
@response: null # placeholder for the response buffer
@length: null # length of the response data
@read: 0 # how many bytes have been read into @response
# parse a received chunk. don't assume the whole thing comes
# in with just one chunk.
receive: (chunk, starting) ->
# is a response buffer created? if so, read for data
if @response
chunk_len: chunk.length
starting: or 0 # starting point on the chunk to read
chunk.copy @response, @read, starting, chunk_len
@read += chunk_len - starting
# are we there yet?
if @read == @length
@type.parse @response
else
null
else
@length: (chunk[0] << 24) +
(chunk[1] << 16) +
(chunk[2] << 8) +
chunk[3] - 1
@type: Riak.messages[chunk[4]]
@response: new Buffer(@length)
@receive chunk, 5
# bleh
conn: new Riak.Connection(8087)
# watches for any received messages
conn.on 'message', (msg) ->
sys.puts "REC: ${sys.inspect msg}"
conn.on 'connect', ->
conn.send Riak.GetRequest, {bucket: 'timeline', key: '2010723213'},
(msg) ->
sys.puts "CALLBACK!"
conn.send Riak.GetRequest, {bucket: 'timeline', key: '2010723212'}, (msg) ->
sys.puts 'ok done'
conn.end()
var Buffer, Riak, Schema, conn, fs, load_schema, messages, net, schema, sys, types;
sys = require('sys');
net = require('net');
fs = require('fs');
Buffer = require('buffer').Buffer;
Schema = require('protobuf_for_node').Schema;
schema = new Schema(fs.readFileSync('riak.desc'));
types = ['RpbErrorResp', 'RpbPingReq', 'RpbPingResp', 'RpbGetClientIdReq', 'RpbGetClientIdResp', 'RpbSetClientIdReq', 'RpbSetClientIdResp', 'RpbGetServerInfoReq', 'RpbGetServerInfoResp', 'RpbGetReq', 'RpbGetResp'];
messages = {};
load_schema = function(name) {
var code, sch;
code = types.indexOf(name);
sch = schema[("riak." + name)];
sch.riak_code = code;
messages[code] = sch;
return sch;
};
Riak = {
GetRequest: load_schema('RpbGetReq'),
GetResponse: load_schema('RpbGetResp'),
messages: messages
};
Riak.Connection = function(port, host) {
var riak;
this.callbacks = [];
this.conn = net.createConnection(port, host);
riak = this;
this.conn.addListener('data', function(chunk) {
var cb, msg;
if ((msg = riak.message.receive(chunk))) {
(cb = riak.callbacks.shift()) ? cb(msg) : null;
riak.conn.emit('message', msg);
return riak.reset();
}
});
this.reset();
return this;
};
Riak.Connection.prototype.reset = function() {
this.message = new Riak.Message(this.conn);
return this.message;
};
Riak.Connection.prototype.send = function(type, data, callback) {
this.callbacks.push(callback);
return this.conn.write(this.prepare(type, data));
};
Riak.Connection.prototype.on = function(name, cb) {
return this.conn.addListener(name, cb);
};
Riak.Connection.prototype.end = function() {
return this.conn.end();
};
Riak.Connection.prototype.prepare = function(type, data) {
var buf, len, msg;
buf = type.serialize(data);
len = buf.length + 1;
msg = new Buffer(len + 4);
msg[0] = len >>> 24;
msg[1] = len >>> 16;
msg[2] = len >>> 8;
msg[3] = len & 255;
msg[4] = type.riak_code;
buf.copy(msg, 5, 0);
return msg;
};
Riak.Message = function(conn) {
this.conn = conn;
this.type = null;
this.response = null;
this.length = null;
this.read = 0;
return this;
};
Riak.Message.prototype.receive = function(chunk, starting) {
var chunk_len;
if (this.response) {
chunk_len = chunk.length;
starting = starting || 0;
chunk.copy(this.response, this.read, starting, chunk_len);
this.read += chunk_len - starting;
if (this.read === this.length) {
return this.type.parse(this.response);
} else {
return null;
}
} else {
this.length = (chunk[0] << 24) + (chunk[1] << 16) + (chunk[2] << 8) + chunk[3] - 1;
this.type = Riak.messages[chunk[4]];
this.response = new Buffer(this.length);
return this.receive(chunk, 5);
}
};
conn = new Riak.Connection(8087);
conn.on('message', function(msg) {
return sys.puts(("REC: " + (sys.inspect(msg))));
});
conn.on('connect', function() {
return conn.send(Riak.GetRequest, {
bucket: 'timeline',
key: '2010723213'
}, function(msg) {
sys.puts("CALLBACK!");
return conn.send(Riak.GetRequest, {
bucket: 'timeline',
key: '2010723212'
}, function(msg) {
sys.puts('ok done');
return conn.end();
});
});
});
package riak;
message RpbErrorResp {
required bytes errmsg = 1;
required uint32 errcode = 2;
}
message RpbGetReq {
required bytes bucket = 1;
required bytes key = 2;
optional uint32 r = 3;
}
message RpbContent {
required bytes value = 1;
optional bytes content_type = 2; // the media type/format
optional bytes charset = 3;
optional bytes content_encoding = 4;
optional bytes vtag = 5;
repeated RpbLink links = 6; // links to other resources
optional uint32 last_mod = 7;
optional uint32 last_mod_usecs = 8;
repeated RpbPair usermeta = 9; // user metadata stored with the object
}
message RpbPair {
required bytes key = 1;
optional bytes value = 2;
}
message RpbLink {
optional bytes bucket = 1;
optional bytes key = 2;
optional bytes tag = 3;
}
message RpbGetResp {
repeated RpbContent content = 1;
optional bytes vclock = 2;
}
message RpbListBucketsResp {
repeated bytes buckets = 1;
}
@technoweenie

This comment has been minimized.

@frank06

This comment has been minimized.

Copy link

commented Jul 24, 2010

neat!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.