Created
October 23, 2015 20:10
-
-
Save rhasson/0460a2b261cf5422ec8f to your computer and use it in GitHub Desktop.
csp concurrent puts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var gen = require('when/generator'), | |
when = require('when'), | |
_ = require('lodash'); | |
var csp = require('js-csp'), | |
pub = csp.operations.pub, | |
sub = csp.operations.pub.sub, | |
unsub = csp.operations.pub.unsub, | |
take = csp.take, | |
put = csp.put, | |
publisher = undefined; | |
var inbound = csp.chan(100), | |
outbound = csp.chan(100), | |
internal = { | |
calls: csp.chan(100) | |
}; | |
var processRequest = gen.lift(function*(params, route_type) { | |
var body = undefined; | |
try { | |
switch(route_type) { | |
case 'call': | |
body = yield voiceCallResponse(params); | |
break; | |
default: | |
body = undefined; | |
break; | |
} | |
console.log('returning body') | |
return body; | |
} catch(e) { | |
console.log('Error processing request - ', e.stack); | |
} | |
}); | |
var voiceCallResponse = gen.lift(function*(params) { | |
return yield getId(); | |
}); | |
function getId(id) { | |
return when('SUCCESS').delay(1000); | |
} | |
function getTopic(value) { | |
return value.route_type; | |
} | |
//Setup the publisher based on route_types | |
publisher = pub(inbound, getTopic); | |
//CSP loop to to process call events | |
//Setup a subscriber to the 'call' type and feed it into call_channel | |
sub(publisher, 'call', internal.calls); | |
csp.go(function* () { | |
console.log('Starting inbound call channel loop'); | |
var body; | |
var value = yield take(internal.calls); | |
while (value !== csp.CLOSED) { | |
if (value != undefined) { | |
body = yield processRequest(value.request.params, value.route_type); | |
console.log('BODY: ', body) | |
if (typeof body === 'string') { | |
//value.body = body; | |
console.log('STRING') | |
csp.putAsync(outbound, {req: value, body: body}); | |
} else if (typeof body === 'object') { | |
body.then(function(resp){ | |
console.log('GOOD: ', resp) | |
if (value != undefined) { | |
//value.body = resp; | |
csp.putAsync(outbound, {req: value, body: resp}); | |
} | |
}).catch(function() { | |
console.log('BAD') | |
}); | |
} else { | |
console.log('SHOULDNT GET HERE') | |
//value.body = undefined; | |
csp.putAsync(outbound, {req: value, body: undefined}); | |
} | |
} | |
value = yield take(internal.calls); | |
} | |
}); | |
module.exports = { | |
inbound: inbound, | |
outbound: outbound | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var restify = require('restify'), | |
server = restify.createServer(), | |
processor = require('./proc'), | |
csp = require('js-csp'); | |
server.use(restify.queryParser()); | |
server.use(restify.gzipResponse()); | |
server.use(restify.bodyParser()); | |
server.post('/actions', postHandlerVoice); | |
function postHandlerVoice(request, reply, next) { | |
csp.putAsync(processor.inbound, { | |
request: request, | |
reply: reply, | |
next: next, | |
route_type: 'call' | |
}); | |
} | |
server.listen(9000, function() { | |
console.log('Started Server '); | |
csp.go(function*() { | |
console.log('Starting outbound channel loop'); | |
var val = yield csp.take(processor.outbound); | |
while (val !== csp.CLOSED) { | |
console.log('OUTBOUND: ', Object.keys(val)) | |
if (val != undefined) { | |
val.req.reply.header('content-type', 'application/xml'); | |
val.req.reply.send(200, val.body, {'content-type': 'application/xml'}); | |
val.req.reply.end(); | |
val.req.next(); | |
} | |
val = yield csp.take(processor.outbound); | |
} | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment