Skip to content

Instantly share code, notes, and snippets.

@leidegre
Created August 21, 2018 18:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leidegre/5dea27d9caa841526d17d44372e16481 to your computer and use it in GitHub Desktop.
Save leidegre/5dea27d9caa841526d17d44372e16481 to your computer and use it in GitHub Desktop.
Go channel abstraction for JavaScript
const { default: Channel } = require("./channel");
const timeout = 100;
async function unbuffered() {
const xs = [];
var ch = new Channel();
xs.push("start");
setTimeout(async () => {
xs.push("timeout");
await ch.receive();
xs.push("received");
}, timeout);
xs.push("send");
await ch.send(1);
assertEqual(["start", "send", "timeout", "received"], xs);
}
async function unbuffered2() {
const xs = [];
var ch = new Channel();
xs.push("start");
setTimeout(async () => {
xs.push("timeout 1");
const x = await ch.receive();
xs.push("received " + x);
}, timeout);
setTimeout(async () => {
xs.push("timeout 2");
const x = await ch.receive();
xs.push("received " + x);
}, 2 * timeout);
setTimeout(async () => {
xs.push("timeout 3");
const x = await ch.receive();
xs.push("received " + x);
}, 3 * timeout);
xs.push("send 1");
await ch.send(1);
xs.push("send 2");
await ch.send(2);
xs.push("send 3");
await ch.send(3);
assertEqual(
[
"start",
"send 1",
"timeout 1",
"received 1",
"send 2",
"timeout 2",
"received 2",
"send 3",
"timeout 3",
"received 3"
],
xs
);
}
async function buffered() {
const xs = [];
var ch = new Channel(2);
xs.push("start");
setTimeout(async () => {
xs.push("timeout 1");
const x = await ch.receive();
xs.push("received " + x);
}, timeout);
setTimeout(async () => {
xs.push("timeout 2");
const x = await ch.receive();
xs.push("received " + x);
}, 2 * timeout);
setTimeout(async () => {
xs.push("timeout 3");
const x = await ch.receive();
xs.push("received " + x);
}, 3 * timeout);
xs.push("send 1");
await ch.send(1);
xs.push("send 2");
await ch.send(2);
xs.push("send 3");
await ch.send(3);
xs.push("send done");
await new Promise(resolve => setTimeout(resolve, 4 * timeout)); // wait...
xs.push("receive done");
assertEqual(
[
"start",
"send 1",
"send 2",
"send 3",
"timeout 1",
"received 1",
"send done",
"timeout 2",
"received 2",
"timeout 3",
"received 3",
"receive done"
],
xs
);
}
async function close() {
const xs = [];
var ch = new Channel();
xs.push("start");
setTimeout(async () => {
xs.push("receive");
const x = await ch.receive();
xs.push("received " + x);
}, timeout);
setTimeout(() => {
xs.push("close");
ch.close();
}, 2 * timeout);
setTimeout(async () => {
xs.push("send 1");
await ch.send(1);
xs.push("send done");
}, 3 * timeout);
await new Promise(resolve => setTimeout(resolve, 4 * timeout)); // wait...
xs.push("done");
assertEqual(
[
"start",
"receive",
"close",
"received undefined",
"send 1",
"send done",
"done"
],
xs
);
}
async function receive() {
const xs = [];
var ch = new Channel();
xs.push("start");
setTimeout(async () => {
xs.push("receive");
const x = await ch.receive();
xs.push("received " + x);
}, timeout);
setTimeout(async () => {
xs.push("send 1");
await ch.send(1);
xs.push("send done");
}, 2 * timeout);
await new Promise(resolve => setTimeout(resolve, 3 * timeout)); // wait...
xs.push("done");
assertEqual(
["start", "receive", "send 1", "received 1", "send done", "done"],
xs
);
}
async function main() {
const tests = [unbuffered(), unbuffered2(), buffered(), close(), receive()];
await Promise.all(tests);
}
function assertEqual(expected, actual) {
const a = JSON.stringify(expected);
const b = JSON.stringify(actual);
if (a !== b) {
console.error(arguments.callee.caller.name, ":", a, "does not equal", b);
} else {
console.debug(arguments.callee.caller.name, ":", "ok");
}
}
main();
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class Channel {
/**
* The capacity, in number of elements, sets the size of the buffer in the channel.
* succeeds only when both a sender and receiver are ready.
*
* see https://golang.org/ref/spec#Channel_types
*/
constructor(cap = 0) {
this._producers = [];
this._consumers = [];
this._buffer = [];
this._closed = false;
this._cap = cap;
}
/**
* The number of elements queued (unread) in the channel buffer.
*/
get size() {
return this._buffer.length;
}
async send(value) {
if (value === undefined) {
throw new TypeError("value cannot be undefined");
}
if (!this._closed) {
this._buffer.push(value);
// unblock a consumer
if (0 < this._consumers.length) {
const consumer = this._consumers.shift();
consumer();
}
if (!(this._buffer.length <= this._cap)) {
// block the producer
const waitTask = new Promise(resolve => {
this._producers.push(resolve);
});
await waitTask; // yield
}
}
}
async receive() {
if (!(0 < this._buffer.length)) {
// block the consumer
const waitTask = new Promise(resolve => {
this._consumers.push(resolve);
});
await waitTask; // yield
}
if (!this._closed) {
if (0 < this._buffer.length) {
const value = this._buffer.shift();
// unblock a producer
if (0 < this._producers.length) {
const producer = this._producers.shift();
producer();
}
return value;
}
}
return undefined;
}
/**
* Close the channel
* note: this is not a Promise-based asynchrnous API
*/
close() {
this._closed = true;
// unblock!
while (0 < this._consumers.length) {
const consumer = this._consumers.shift();
consumer();
}
while (0 < this._producers.length) {
const producer = this._producers.shift();
producer();
}
}
}
exports.default = Channel;
/**
* starts the execution of a function call as an independent concurrent thread of control.
*/
function go(f: () => Promise<any>) {
setImmediate(() =>
f().catch((err: any) => {
console.error(err);
process.exit(1);
})
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment