public
Last active

Simple Couchbase pooling, still searching for a method to close existing connections.

  • Download Gist
couchPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
var Config = require( '../config' )
, couchbase = require( 'couchbase' )
, couchPool = {
base: {
'hosts': Config.couchbase.hosts,
'port': 8091,
'username': Config.couchbase.username,
'password': Config.couchbase.password
},
minimalConnections: 5,
buckets: [ 'default' ],
connections: {},
setup: function(cb) {
var connection = 0;
return couchPool.buckets.forEach(function(bucketName){
var i;
for( i = 0; i < couchPool.minimalConnections; i++ ) {
couchPool.addConnection(bucketName, function(){
connection++;
if( connection == ( couchPool.buckets.length * couchPool.minimalConnections ) ) {
cb(connection)
}
}, false)
}
return;
})
},
addConnection: function(bucketName, cb, used) {
console.log( 'addConnection: ' + bucketName )
try {
couchbase.connect({
'hosts': couchPool.base.hosts,
'port': couchPool.base.port,
'username': couchPool.base.username,
'password': couchPool.base.password,
'bucket': bucketName,
'debug': false
}, function(err, bucket){
if( err )
return console.log(err);
 
if( !( bucketName in couchPool.connections ) )
couchPool.connections[( bucketName )] = [];
var k = couchPool.connections[( bucketName )].push({
active: used,
bucket: bucket,
died: false
})
 
return cb(bucket, (k-1))
})
}
catch( e ) {
console.log( '[couchpool] error ' + e)
 
// Retry in next tick
process.nextTick(function(){
couchPool.addConnection(bucketName, cb, used)
})
 
return;
}
},
removeConnection: function(bucketName, k){
if( ( bucketName in couchPool.connections )
&& couchPool.connections[( bucketName )][( k )] ) {
console.log( '[couchpool] free up connection' )
couchPool.connections[( bucketName )][( k )].active = false;
}
},
free: function(bucketName, k) {
console.log( '[couchpool] Free up connection ' + bucketName )
couchPool.removeConnection(bucketName, k)
if( !( bucketName in couchPool.connections ) )
return console.log( '[Couchpool] Bucket ' + bucketName + ' doesn\'t exist' );
 
var activeConnections = 0;
couchPool.connections[( bucketName )].every(function(connection, k){
if( connection.active === true
&& connection.died === false ) {
activeConnections++;
if( activeConnections > couchPool.minimalConnections ) {
// connection.bucket.close() -- currently no close method supported
}
}
return true;
})
},
get: function(bucketName, cb) {
if( !( bucketName in couchPool.connections ) )
return console.log( '[Couchpool] Bucket ' + bucketName + ' doesn\'t exist' );
 
var found = false, closeHandler = function(connection, bucketName, k){
return cb(connection, function(){
couchPool.free(bucketName, k)
})
};
couchPool.connections[( bucketName )].every(function(connection, k){
if( connection.active === false
&& connection.died === false ) {
// Set used
couchPool.connections[( bucketName )][( k )].active = true;
closeHandler(connection.bucket, bucketName, k)
 
found = true;
return false;
}
return true;
})
 
// Found something?
if( !found ) {
console.log( 'no free connection, request a new one' )
return couchPool.addConnection( bucketName, function(c, k){
closeHandler(c, bucketName, k)
}, true )
}
else
return;
}
};
 
couchPool.setup(function(c){
console.log( 'Buckets: ' + c )
 
// Run crazy code
var i = 0, ids = [], runDate = Math.floor(new Date().getTime()/1000);
for( i = 0; i <= 300; i++ )
ids.push(i)
 
console.log(ids)
ids.every(function(i){
console.log( 'request bucket' )
couchPool.get( 'default', function(bucket, done){
console.log( 'add item: ' + i )
bucket.add( 'invoices::test::' + runDate + '::#' + i, { 'test': 'test', 'id': i }, function(err, result){
if( err )
console.log(err)
console.log(result)
 
return done()
})
})
return true;
})
})

Well, I didn't need to close the connections - I keep all open as they are used all the time. The server is quite loaded at all times.

I think I'll abandon this approach, though. There's no reliable way to ensure that you will not accidentally overload one connection. And if you do, there's no going back - node just hangs and consumes CPU.

Haha, I'll keep with the modified cradle for now.
Couchbase lib doesn't take async calls in account and that's a shame.

By the way, the Java SDK seems much more stable and faster.
I implemented a quick and dirty REST proxy as a Java app -- 200 lines of code using Jetty -- and now I can feed it as many docs as I want and it saves them 4 times faster.

The drawback is that you need to run another process to proxy all Couchbase communication. I need only gets() and cas() though

The Java SDK lacks multiple get, which is sad

Mmm, that doesn't sound like a bad idea. Would you like to share it with us? :)

@mladenmarkov That got our attention, as we're seriously struggling.. care to share your Java code? We're not too familiar with Java.

Nvm. We now use node-memcache with baseview :)

@DeviaVir I'll create a GitHub project tomorrow
@renedx I already tried node-memcache earlier today, but it wouldn't connect to my Couchbase 2.0 Server

@mladenmarkov It didn't connect here first because of the firewall blocking it. Now it just works and fast :)
The port given should be the default memcache port. You can define a memcache port on each bucket in the Couchbase Server. We also run 2.0.

@DeviaVir, here's the Couchbase Java Proxy, if you're still interested. I'm using it in production and it works great. It's even faster than node-memcache and memcached. That is with CAS, which I need to use.

https://github.com/mladenmarkov/couchbase-proxy

@trondn told me he's working on the problem, so hopefully it should be fixed soon. I'd rather use a node-only solution than have to manage another process for proxying Couchbase calls.
http://www.couchbase.com/issues/browse/JSCBC-14

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.