Skip to content

Instantly share code, notes, and snippets.

@getify
Last active January 9, 2024 09:55
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save getify/e0d04f1f5aa24b1947ae to your computer and use it in GitHub Desktop.
Save getify/e0d04f1f5aa24b1947ae to your computer and use it in GitHub Desktop.
examples using go-style CSP API emulation on top of asynquence-flavored CSP
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
yield ASQ.csp.put(ch,1);
yield ASQ.csp.take( ASQ.csp.timeout(1000) );
console.log( "a", yield ASQ.csp.take( ch ) );
}),
ASQ.csp.go(function*(ch) {
yield ASQ.csp.take( ASQ.csp.timeout(500) );
console.log( "b", yield ASQ.csp.take( ch ) );
yield ASQ.csp.put(ch,2);
}),
ASQ.csp.go(function*(ch) {
yield ASQ.csp.take( ASQ.csp.timeout(100) );
console.log( "c", yield ASQ.csp.take( ch ) );
yield ASQ.csp.put(ch,3);
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// c 1
// b 3
// a 2
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch) {
yield ASQ.csp.put(ch, 5);
ch.close();
}),
ASQ.csp.go(function*(ch) {
yield ASQ.csp.take(ASQ.csp.timeout(1000));
console.log(yield ASQ.csp.take(ch));
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// 5
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch) {
console.log(yield ASQ.csp.take(ch));
}),
ASQ.csp.go(function*(ch) {
yield ASQ.csp.take(ASQ.csp.timeout(1000));
yield ASQ.csp.put(ch, 5);
ch.close();
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// 5
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ( "Hello world" ) // <-- prime the goroutines' default channel with initial message(s)
.runner(
ASQ.csp.go(function*(ch) {
yield ASQ.csp.put(ch, 5);
ch.close();
}),
ASQ.csp.go(function*(ch) {
yield ASQ.csp.take(ASQ.csp.timeout(1000));
console.log(yield ASQ.csp.take(ch));
yield ASQ.csp.take(ASQ.csp.timeout(1000));
console.log(yield ASQ.csp.take(ch));
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// Hello world
// 5
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ( "Hello", "World" ) // <-- prime the goroutines' default channel with initial message(s)
.runner(
ASQ.csp.go(function*(ch){
// try to add third message onto the channel
yield ASQ.csp.put(ch,42);
// `42` never gets onto the channel, so is discarded
}),
ASQ.csp.go(function*(ch){
// take only 1 message off the channel
var v = yield ASQ.csp.take(ch);
console.log(v);
// close the channel (leaving messages untaken)
ch.close();
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// Hello
// all done: { '0': 'World' } <-- untaken messages from the closed channel are passed along as individual `arguments`
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
// make two extra channels
var ch2 = ASQ.csp.chan(3); // use buffering
var ch3 = ASQ.csp.chan(3); // use buffering
// use the default channel `ch` to pass
// the extra channels themselves along :)
yield ASQ.csp.put(ch,ch2);
yield ASQ.csp.put(ch,ch3);
// done with the default channel now
ch.close();
for (var i=0; i<10; i++) {
yield ASQ.csp.put(ch2,i);
yield ASQ.csp.put(ch3,i*2);
}
ch2.close();
ch3.close();
}),
ASQ.csp.go(function*(ch){
// receive the extra channel
var ch2 = yield ASQ.csp.take(ch);
var v;
while ((v = yield ASQ.csp.take(ch2)) !== ASQ.csp.CLOSED) {
console.log("ch2:",v);
}
}),
ASQ.csp.go(function*(ch){
// receive the extra channel
var ch3 = yield ASQ.csp.take(ch);
var v;
while ((v = yield ASQ.csp.take(ch3)) !== ASQ.csp.CLOSED) {
console.log("ch3:",v);
}
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// ch2: 0
// ch2: 1
// ch2: 2
// ch2: 3
// ch2: 4
// ch3: 0
// ch3: 2
// ch3: 4
// ch3: 6
// ch2: 5
// ch2: 6
// ch2: 7
// ch2: 8
// ch3: 8
// ch3: 10
// ch3: 12
// ch3: 14
// ch3: 16
// ch3: 18
// ch2: 9
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch) {
var v;
while((v = yield ASQ.csp.take(ch)) !== ASQ.csp.CLOSED) {
console.log(v);
yield ASQ.csp.take(ASQ.csp.timeout(300));
yield ASQ.csp.put(ch, 2);
}
}),
ASQ.csp.go(function*(ch) {
var v;
yield ASQ.csp.put(ch, 1);
while((v = yield ASQ.csp.take(ch)) !== ASQ.csp.CLOSED) {
console.log(v);
yield ASQ.csp.take(ASQ.csp.timeout(200));
yield ASQ.csp.put(ch, 3);
}
}),
ASQ.csp.go(function*(ch) {
yield ASQ.csp.take(ASQ.csp.timeout(5000));
ch.close();
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// 1
// 2
// 3
// 2
// 3
// 2
// 3
// 2
// 3
// 2
// 3
// 2
// 3
// 2
// 3
// 2
// 3
// 2
// 3
// 2
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
while (yield ASQ.csp.put(ch, 1)) { yield ASQ.csp.take(ASQ.csp.timeout(250)); }
}),
ASQ.csp.go(function*(ch) {
while (yield ASQ.csp.put(ch, 2)) { yield ASQ.csp.take(ASQ.csp.timeout(300)); }
}),
ASQ.csp.go(function*(ch) {
while (yield ASQ.csp.put(ch, 3)) { yield ASQ.csp.take(ASQ.csp.timeout(900)); }
}),
ASQ.csp.go(function*(ch) {
for (var i=0; i<10; i++) {
console.log(yield ASQ.csp.take(ch));
}
ch.close();
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// 1
// 2
// 3
// 1
// 2
// 1
// 2
// 1
// 3
// 2
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch) {
// change the default buffer size on the default channel
ch.buffer_size = 13;
for(var x=0; x<15; x++) {
yield ASQ.csp.put(ch, x);
console.log('put ' + x);
}
}),
ASQ.csp.go(function*(ch) {
while(!ch.closed) {
yield ASQ.csp.take(ASQ.csp.timeout(200));
for(var i=0; i<5; i++) {
console.log(yield ASQ.csp.take(ch));
}
}
}),
ASQ.csp.go(function*(ch) {
yield ASQ.csp.take(ASQ.csp.timeout(1000));
ch.close();
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// put 0
// put 1
// put 2
// put 3
// put 4
// put 5
// put 6
// put 7
// put 8
// put 9
// put 10
// put 11
// put 12
// 0
// 1
// 2
// 3
// 4
// put 13
// put 14
// 5
// 6
// 7
// 8
// 9
// 10
// 11
// 12
// 13
// 14
// {}
// {}
// {}
// {}
// {}
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(table){
table.go(player, ["ping"]);
table.go(player, ["pong"]);
yield ASQ.csp.put(table, {hits: 0});
yield ASQ.csp.take(ASQ.csp.timeout(1000));
table.close();
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
function* player(table, name) {
while (true) {
var ball = yield ASQ.csp.take(table);
if (ball === ASQ.csp.CLOSED) {
console.log(name + ": table's gone");
return;
}
ball.hits += 1;
console.log(name + " " + ball.hits);
yield ASQ.csp.take(ASQ.csp.timeout(100));
yield ASQ.csp.put(table, ball);
}
}
// ping 1
// pong 2
// ping 3
// pong 4
// ping 5
// pong 6
// ping 7
// pong 8
// ping 9
// pong 10
// ping: table's gone
// pong: table's gone
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(x){
var el = document.querySelector('#ui1');
var ch = listen(el, 'mousemove');
while(true) {
var e = yield ASQ.csp.take(ch);
el.innerHTML = ((e.layerX || e.clientX) + ', ' +
(e.layerY || e.clientY));
}
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
function listen(el, type) {
var ch = ASQ.csp.chan();
el.addEventListener(type, function(e) {
ASQ.csp.putAsync(ch, e);
});
return ch;
}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
listen(ch);
while (yield ASQ.csp.put(ch,Math.random())) {}
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
function listen(ch) {
setTimeout(ch.close,5000);
(function iter(){
ASQ.csp.takeAsync(ch)
.val(function(v){
if (v !== ASQ.csp.CLOSED) {
console.log(v);
setTimeout(iter,500);
}
else {
console.log("can't get anymore");
}
})
.or(function(err){
console.log(err);
});
})();
}
// 0.2430584009271115
// 0.5205253872554749
// 0.5858484928030521
// 0.30812705494463444
// 0.4534017143305391
// 0.057873843470588326
// 0.4264718643389642
// 0.8639854174107313
// 0.6830783057957888
// 0.06948345596902072
// all done: {}
// can't get anymore
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
listen(ch);
while (yield ASQ.csp.put(ch,Math.random())) {}
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
function listen(ch) {
setTimeout(ch.close,5000);
var rsq = ASQ.react(function(proceed,stop){
(function iter(){
ASQ.csp.takeAsync(ch)
.val(function(v){
if (v !== ASQ.csp.CLOSED) {
setTimeout(iter,500);
}
proceed(v);
});
})();
stop(function(){
console.log("can't get anymore");
});
})
.val(function(v){
if (v !== ASQ.csp.CLOSED) {
console.log(v);
}
else {
rsq.stop();
}
})
.or(function(err){
console.log(err);
});
}
// 0.20062327338382602
// 0.1929178275167942
// 0.2259157495573163
// 0.0409365005325526
// 0.8685692832805216
// 0.2227405107114464
// 0.23395077511668205
// 0.9894116870127618
// 0.5614675949327648
// 0.8783516574185342
// all done: {}
// can't get anymore
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
yield ASQ.csp.put(ch,"val 1");
yield ASQ.csp.put(ch,new Error("oops 2"));
}),
ASQ.csp.go(function*(ch){
try {
var v = yield ASQ.csp.takem(ch);
console.log(v);
v = yield ASQ.csp.takem(ch);
}
catch (err) {
console.log(err.toString());
}
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// val 1
// Error: oops 2
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
listen(ch);
yield ASQ.csp.put(ch,"val 1");
yield ASQ.csp.put(ch,new Error("oops 2"));
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
function listen(ch) {
ASQ.csp.takemAsync(ch)
.val(function(msg){
console.log(msg);
})
.seq(function(){
return ASQ.csp.takemAsync(ch)
.val(function(){
// never gets here
});
})
.or(function(err){
console.log(err.toString());
});
}
// val 1
// all done: {}
// Error: oops 2
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
var ch2 = ASQ.csp.chan();
yield ASQ.csp.put(ch,ch2);
for (var i=0; i<5; i++) {
yield ASQ.csp.put(ch2,i);
}
ch2.close();
}),
ASQ.csp.go(function*(ch){
var ch3 = ASQ.csp.chan();
yield ASQ.csp.put(ch,ch3);
for (var i=0; i<5; i++) {
yield ASQ.csp.put(ch3,i*2);
}
ch3.close();
}),
ASQ.csp.go(function*(ch){
var ch4 = ASQ.csp.chan(), v;
yield ASQ.csp.put(ch,ch4);
while ((v = yield ASQ.csp.take(ch4)) !== ASQ.csp.CLOSED) {
console.log("ch4:",v);
}
console.log("ch4 closed")
}),
ASQ.csp.go(function*(ch){
var ch2 = yield ASQ.csp.take(ch);
var ch3 = yield ASQ.csp.take(ch);
var ch4 = yield ASQ.csp.take(ch);
ch.close();
var r, ch4val = 10;
while (true) {
r = yield ASQ.csp.alts([
/*take*/ch2,
/*take*/ch3,
/*put*/ [ch4,ch4val]
]);
if (r.value === ASQ.csp.CLOSED) {
if (r.channel === ch2) {
console.log("ch2 closed");
}
else if (r.channel === ch3) {
console.log("ch3 closed");
}
}
else {
if (r.channel === ch2) {
console.log("ch2:",r.value);
}
else if (r.channel === ch3) {
console.log("ch3:",r.value);
}
else if (r.channel === ch4) {
if (r.value === true) {
ch4val += 10;
}
}
}
if (ch2.closed && ch3.closed) break;
}
ch4.close();
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// ch2: 0
// ch2: 1
// ch2: 2
// ch2: 3
// ch2: 4
// ch2 closed
// ch3: 0
// ch3: 2
// ch3: 4
// ch3: 6
// ch3: 8
// ch3 closed
// ch4 closed
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
var ch2 = ASQ.csp.chan();
yield ASQ.csp.put(ch,ch2);
for (var i=0; i<5; i++) {
if (i % 2 === 1) yield ASQ.csp.take(ASQ.csp.timeout(100));
yield ASQ.csp.put(ch2,i);
}
ch2.close();
}),
ASQ.csp.go(function*(ch){
var ch3 = ASQ.csp.chan();
yield ASQ.csp.put(ch,ch3);
for (var i=0; i<5; i++) {
if (i % 2 === 0) yield ASQ.csp.take(ASQ.csp.timeout(100));
yield ASQ.csp.put(ch3,i*2);
}
ch3.close();
}),
ASQ.csp.go(function*(ch){
var ch4 = ASQ.csp.chan(), v;
yield ASQ.csp.put(ch,ch4);
while ((v = yield ASQ.csp.take(ch4)) !== ASQ.csp.CLOSED) {
console.log("ch4:",v);
}
console.log("ch4 closed")
}),
ASQ.csp.go(function*(ch){
var ch2 = yield ASQ.csp.take(ch);
var ch3 = yield ASQ.csp.take(ch);
var ch4 = yield ASQ.csp.take(ch);
ch.close();
var r, ch4val = 10;
while (true) {
yield ASQ.csp.take(ASQ.csp.timeout(50));
r = yield ASQ.csp.alts([
/*take*/ch2,
/*take*/ch3,
/*put*/ [ch4,ch4val]
]);
if (r.value === ASQ.csp.CLOSED) {
if (r.channel === ch2) {
console.log("ch2 closed");
}
else if (r.channel === ch3) {
console.log("ch3 closed");
}
}
else {
if (r.channel === ch2) {
console.log("ch2:",r.value);
}
else if (r.channel === ch3) {
console.log("ch3:",r.value);
}
else if (r.channel === ch4) {
if (r.value === true) {
ch4val += 10;
}
}
}
if (ch2.closed && ch3.closed) break;
}
ch4.close();
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// ch2: 0
// ch3: 0
// ch2: 1
// ch2: 2
// ch3: 2
// ch2: 3
// ch2: 4
// ch3: 4
// ch3: 6
// ch4: 10
// ch3: 8
// ch4: 20
// ch4 closed
// all done: {}
var ASQ = require("asynquence-contrib");
ASQ()
.runner(
ASQ.csp.go(function*(ch){
console.log("A:start");
yield ASQ.csp.take(ASQ.csp.timeout(100));
// add another goroutine
ch.go(function*(ch){
console.log("A:sub");
});
yield ASQ.csp.take(ASQ.csp.timeout(300));
console.log("A:end");
}),
ASQ.csp.go(function*(ch){
console.log("B:start");
yield ASQ.csp.take(ASQ.csp.timeout(2000));
// add another goroutine
ch.go(function*(ch){
console.log("B:sub");
});
yield ASQ.csp.take(ASQ.csp.timeout(1000));
console.log("B:end");
}),
ASQ.csp.go(function*(ch){
console.log("C:start");
// add another goroutine
ch.go(function*(ch){
console.log("C:sub");
});
yield ASQ.csp.take(ASQ.csp.timeout(5000));
console.log("C:end");
}),
ASQ.csp.go(function*(ch){
console.log("D:start");
yield ASQ.csp.take(ASQ.csp.timeout(6000));
console.log("D:end");
})
)
.val(function(){
console.log("all done:", arguments);
})
.or(function(err){
console.log(err.stack || err);
});
// A:start
// B:start
// C:start
// D:start
// C:sub
// A:sub
// A:end
// B:sub
// B:end
// C:end
// D:end
// all done: {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment