Skip to content

Instantly share code, notes, and snippets.

@rhasson
Created October 23, 2015 20:10
Show Gist options
  • Save rhasson/0460a2b261cf5422ec8f to your computer and use it in GitHub Desktop.
Save rhasson/0460a2b261cf5422ec8f to your computer and use it in GitHub Desktop.
csp concurrent puts
var gen = require('when/generator'),
when = require('when'),
_ = require('lodash');
var csp = require('js-csp'),
pub = csp.operations.pub,
sub = csp.operations.pub.sub,
unsub = csp.operations.pub.unsub,
take = csp.take,
put = csp.put,
publisher = undefined;
var inbound = csp.chan(100),
outbound = csp.chan(100),
internal = {
calls: csp.chan(100)
};
var processRequest = gen.lift(function*(params, route_type) {
var body = undefined;
try {
switch(route_type) {
case 'call':
body = yield voiceCallResponse(params);
break;
default:
body = undefined;
break;
}
console.log('returning body')
return body;
} catch(e) {
console.log('Error processing request - ', e.stack);
}
});
var voiceCallResponse = gen.lift(function*(params) {
return yield getId();
});
function getId(id) {
return when('SUCCESS').delay(1000);
}
function getTopic(value) {
return value.route_type;
}
//Setup the publisher based on route_types
publisher = pub(inbound, getTopic);
//CSP loop to to process call events
//Setup a subscriber to the 'call' type and feed it into call_channel
sub(publisher, 'call', internal.calls);
csp.go(function* () {
console.log('Starting inbound call channel loop');
var body;
var value = yield take(internal.calls);
while (value !== csp.CLOSED) {
if (value != undefined) {
body = yield processRequest(value.request.params, value.route_type);
console.log('BODY: ', body)
if (typeof body === 'string') {
//value.body = body;
console.log('STRING')
csp.putAsync(outbound, {req: value, body: body});
} else if (typeof body === 'object') {
body.then(function(resp){
console.log('GOOD: ', resp)
if (value != undefined) {
//value.body = resp;
csp.putAsync(outbound, {req: value, body: resp});
}
}).catch(function() {
console.log('BAD')
});
} else {
console.log('SHOULDNT GET HERE')
//value.body = undefined;
csp.putAsync(outbound, {req: value, body: undefined});
}
}
value = yield take(internal.calls);
}
});
module.exports = {
inbound: inbound,
outbound: outbound
}
var restify = require('restify'),
server = restify.createServer(),
processor = require('./proc'),
csp = require('js-csp');
server.use(restify.queryParser());
server.use(restify.gzipResponse());
server.use(restify.bodyParser());
server.post('/actions', postHandlerVoice);
function postHandlerVoice(request, reply, next) {
csp.putAsync(processor.inbound, {
request: request,
reply: reply,
next: next,
route_type: 'call'
});
}
server.listen(9000, function() {
console.log('Started Server ');
csp.go(function*() {
console.log('Starting outbound channel loop');
var val = yield csp.take(processor.outbound);
while (val !== csp.CLOSED) {
console.log('OUTBOUND: ', Object.keys(val))
if (val != undefined) {
val.req.reply.header('content-type', 'application/xml');
val.req.reply.send(200, val.body, {'content-type': 'application/xml'});
val.req.reply.end();
val.req.next();
}
val = yield csp.take(processor.outbound);
}
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment