Skip to content

Instantly share code, notes, and snippets.

@technoweenie
Created July 24, 2010 07:06
Show Gist options
  • Save technoweenie/488488 to your computer and use it in GitHub Desktop.
Save technoweenie/488488 to your computer and use it in GitHub Desktop.
very quick riak protobuf example in node.js
sys: require 'sys'
net: require 'net'
fs: require 'fs'
Buffer: require('buffer').Buffer
Schema: require('protobuf_for_node').Schema
schema: new Schema(fs.readFileSync('riak.desc'))
types: ['RpbErrorResp', 'RpbPingReq', 'RpbPingResp', 'RpbGetClientIdReq',
'RpbGetClientIdResp', 'RpbSetClientIdReq', 'RpbSetClientIdResp',
'RpbGetServerInfoReq', 'RpbGetServerInfoResp', 'RpbGetReq', 'RpbGetResp']
messages: {}
# loads a schema, assigns a riak code based on it's array position
load_schema: (name) ->
code: types.indexOf name
sch: schema["riak.$name"]
sch.riak_code: code
messages[code]: sch
sch
Riak: {
GetRequest: load_schema 'RpbGetReq'
GetResponse: load_schema 'RpbGetResp'
messages: messages
}
# sends protobuf bytes to riak
class Riak.Connection
constructor: (port, host) ->
@callbacks: []
@conn: net.createConnection port, host
riak: this
@conn.addListener 'data', (chunk) ->
if msg: riak.message.receive chunk
if cb: riak.callbacks.shift()
cb msg
riak.conn.emit 'message', msg
riak.reset()
@reset()
reset: ->
@message: new Riak.Message(@conn)
send: (type, data, callback) ->
@callbacks.push callback
@conn.write @prepare(type, data)
on: (name, cb) ->
@conn.addListener name, cb
end: ->
@conn.end()
prepare: (type, data) ->
buf: type.serialize data
len: buf.length + 1
msg: new Buffer(len + 4)
msg[0]: len >>> 24
msg[1]: len >>> 16
msg[2]: len >>> 8
msg[3]: len & 255
msg[4]: type.riak_code
buf.copy msg, 5, 0
msg
# Reads buffers from riak and parses them.
class Riak.Message
constructor: (conn) ->
@conn: conn
@type: null # the protobuf response type
@response: null # placeholder for the response buffer
@length: null # length of the response data
@read: 0 # how many bytes have been read into @response
# parse a received chunk. don't assume the whole thing comes
# in with just one chunk.
receive: (chunk, starting) ->
# is a response buffer created? if so, read for data
if @response
chunk_len: chunk.length
starting: or 0 # starting point on the chunk to read
chunk.copy @response, @read, starting, chunk_len
@read += chunk_len - starting
# are we there yet?
if @read == @length
@type.parse @response
else
null
else
@length: (chunk[0] << 24) +
(chunk[1] << 16) +
(chunk[2] << 8) +
chunk[3] - 1
@type: Riak.messages[chunk[4]]
@response: new Buffer(@length)
@receive chunk, 5
# bleh
conn: new Riak.Connection(8087)
# watches for any received messages
conn.on 'message', (msg) ->
sys.puts "REC: ${sys.inspect msg}"
conn.on 'connect', ->
conn.send Riak.GetRequest, {bucket: 'timeline', key: '2010723213'},
(msg) ->
sys.puts "CALLBACK!"
conn.send Riak.GetRequest, {bucket: 'timeline', key: '2010723212'}, (msg) ->
sys.puts 'ok done'
conn.end()
var Buffer, Riak, Schema, conn, fs, load_schema, messages, net, schema, sys, types;
sys = require('sys');
net = require('net');
fs = require('fs');
Buffer = require('buffer').Buffer;
Schema = require('protobuf_for_node').Schema;
schema = new Schema(fs.readFileSync('riak.desc'));
types = ['RpbErrorResp', 'RpbPingReq', 'RpbPingResp', 'RpbGetClientIdReq', 'RpbGetClientIdResp', 'RpbSetClientIdReq', 'RpbSetClientIdResp', 'RpbGetServerInfoReq', 'RpbGetServerInfoResp', 'RpbGetReq', 'RpbGetResp'];
messages = {};
load_schema = function(name) {
var code, sch;
code = types.indexOf(name);
sch = schema[("riak." + name)];
sch.riak_code = code;
messages[code] = sch;
return sch;
};
Riak = {
GetRequest: load_schema('RpbGetReq'),
GetResponse: load_schema('RpbGetResp'),
messages: messages
};
Riak.Connection = function(port, host) {
var riak;
this.callbacks = [];
this.conn = net.createConnection(port, host);
riak = this;
this.conn.addListener('data', function(chunk) {
var cb, msg;
if ((msg = riak.message.receive(chunk))) {
(cb = riak.callbacks.shift()) ? cb(msg) : null;
riak.conn.emit('message', msg);
return riak.reset();
}
});
this.reset();
return this;
};
Riak.Connection.prototype.reset = function() {
this.message = new Riak.Message(this.conn);
return this.message;
};
Riak.Connection.prototype.send = function(type, data, callback) {
this.callbacks.push(callback);
return this.conn.write(this.prepare(type, data));
};
Riak.Connection.prototype.on = function(name, cb) {
return this.conn.addListener(name, cb);
};
Riak.Connection.prototype.end = function() {
return this.conn.end();
};
Riak.Connection.prototype.prepare = function(type, data) {
var buf, len, msg;
buf = type.serialize(data);
len = buf.length + 1;
msg = new Buffer(len + 4);
msg[0] = len >>> 24;
msg[1] = len >>> 16;
msg[2] = len >>> 8;
msg[3] = len & 255;
msg[4] = type.riak_code;
buf.copy(msg, 5, 0);
return msg;
};
Riak.Message = function(conn) {
this.conn = conn;
this.type = null;
this.response = null;
this.length = null;
this.read = 0;
return this;
};
Riak.Message.prototype.receive = function(chunk, starting) {
var chunk_len;
if (this.response) {
chunk_len = chunk.length;
starting = starting || 0;
chunk.copy(this.response, this.read, starting, chunk_len);
this.read += chunk_len - starting;
if (this.read === this.length) {
return this.type.parse(this.response);
} else {
return null;
}
} else {
this.length = (chunk[0] << 24) + (chunk[1] << 16) + (chunk[2] << 8) + chunk[3] - 1;
this.type = Riak.messages[chunk[4]];
this.response = new Buffer(this.length);
return this.receive(chunk, 5);
}
};
conn = new Riak.Connection(8087);
conn.on('message', function(msg) {
return sys.puts(("REC: " + (sys.inspect(msg))));
});
conn.on('connect', function() {
return conn.send(Riak.GetRequest, {
bucket: 'timeline',
key: '2010723213'
}, function(msg) {
sys.puts("CALLBACK!");
return conn.send(Riak.GetRequest, {
bucket: 'timeline',
key: '2010723212'
}, function(msg) {
sys.puts('ok done');
return conn.end();
});
});
});
package riak;
message RpbErrorResp {
required bytes errmsg = 1;
required uint32 errcode = 2;
}
message RpbGetReq {
required bytes bucket = 1;
required bytes key = 2;
optional uint32 r = 3;
}
message RpbContent {
required bytes value = 1;
optional bytes content_type = 2; // the media type/format
optional bytes charset = 3;
optional bytes content_encoding = 4;
optional bytes vtag = 5;
repeated RpbLink links = 6; // links to other resources
optional uint32 last_mod = 7;
optional uint32 last_mod_usecs = 8;
repeated RpbPair usermeta = 9; // user metadata stored with the object
}
message RpbPair {
required bytes key = 1;
optional bytes value = 2;
}
message RpbLink {
optional bytes bucket = 1;
optional bytes key = 2;
optional bytes tag = 3;
}
message RpbGetResp {
repeated RpbContent content = 1;
optional bytes vclock = 2;
}
message RpbListBucketsResp {
repeated bytes buckets = 1;
}
@frank06
Copy link

frank06 commented Jul 24, 2010

neat!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment