Skip to content

Instantly share code, notes, and snippets.

@qti3e
Created May 20, 2021 17:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save qti3e/99c57dfeefca2e88b4746d19e860c45f to your computer and use it in GitHub Desktop.
Save qti3e/99c57dfeefca2e88b4746d19e860c45f to your computer and use it in GitHub Desktop.
Go concurrency in JavaScript
import { Go, Chan, Lock } from "./go";
const chan = Chan<number>();
const sumPromise = Go(async ($: WaitFn) => {
let sum = 0;
for await (const msg of chan.receive()) {
console.log('Received', msg);
sum += msg;
}
return sum;
});
Go(async ($: WaitFn) => {
for await (const i of $.range(1000)) {
console.log('Send', i);
chan.send(i);
}
chan.close();
});
const sum = await sumPromise;
console.log('Sum =', sum);
import { Go, Chan, Lock } from "./go";
Go(async ($: WaitFn) => {
for await (const i of $.range(5)) {
console.log('F1', i);
}
});
Go(async ($: WaitFn) => {
for await (const i of $.range(10)) {
console.log('F2', i);
}
});
import { once } from './once';
// The codes in this file allows us to have a concurrency model like the Go lang
// there is an exported function called `Go` which works somewhat like the `go`
// keyword in the Golang.
//
// ```ts
// Go(async ($: WaitFn) => {
// for await (const i of $.range(5)) {
// console.log('F1', i);
// }
// });
//
// Go(async ($: WaitFn) => {
// for await (const i of $.range(10)) {
// console.log('F2', i);
// }
// });
// ```
//
// Both of these functions will run concurrently.
//
// The current implementation uses a global job task scheduler, and it is
// designed to block the main thread for 100 consecutive ms at most (if used
// correctly of course.), calling the WaitFn ($) will return a promise that
// gets resolved once the function is allowed to continue execution, using the
// same idea there are some utility functions that are using the waitFn in their
// implementation including `waitFn.range()` and `waitFn.iter()`, also Channels
// and Locks are implemented on the same idea.
export interface WaitFn {
/**
* Returns a promise that get resolved once the function is allowed to
* continue execution.
*/
(): Promise<void>;
/**
* Returns an async iterable that yields numbers from 0 to n (exclusive).
* @param n The end number.
*/
range(n: number): AsyncIterable<number>;
/**
* Returns an async iterable over the given iterable.
* @param iterable The iterable.
*/
iter<T>(iterable: Iterable<T>): AsyncIterable<T>;
}
/**
* Call this function to release the lock, it will throw if called
* more than one time.
*/
export type LockReleaseCb = () => void;
/**
* A lock can be used to lock a specific resource.
*/
export interface Lock {
/**
* Retains the lock asynchronously.
*/
retain(): Promise<LockReleaseCb>;
/**
* Tries to retain the lock synchronously if the lock is already retained by
* someone else it simply returns undefined.
*/
retainSync(): LockReleaseCb | undefined;
}
/**
* A channel can be used to send messages from/to routines.
*
* # Example
*
* ```ts
* const chan = Chan<number>();
* const sumPromise = Go(async ($: WaitFn) => {
* let sum = 0;
* for await (const msg of chan.receive()) {
* console.log('Received', msg);
* sum += msg;
* }
* return sum;
* });
*
* Go(async ($: WaitFn) => {
* for await (const i of $.range(1000)) {
* console.log('Send', i);
* chan.send(i);
* }
* chan.close();
* });
*
* const sum = await sumPromise;
* console.log('Sum =', sum);
* ```
*/
export interface Chan<T> {
/**
* Indicates if the channel is closed, you cannot send messages in a closed
* channel.
*/
readonly isClosed: boolean;
/**
* Closes the channel.
*/
close(): void;
/**
* Send the given data thought the channel, only the currently active
* receivers will receive the message (Receivers that were registered
* before calling this function.)
* @param data The data which you want to send.
*/
send(data: T): void;
/**
* Returns an async iterable over the message of this channel.
*/
receive(): AsyncIterable<T>;
/**
* Returns the first message, if there us no message before channel being
* closed returns an undefined.
*/
first(): Promise<T | undefined>;
}
/**
* A function that can be executed using the Go function.
*/
export type ConcurrentFunction<Args extends any[] = [], R = void> = (
$: WaitFn,
...args: Args
) => Promise<R>;
/* tslint:disable:no-namespace */
namespace Concurrency {
// The implementation.
interface Resolvable extends Promise<void> {
resolve(): void;
}
function createResolvable(): Resolvable {
let resolve: () => void;
const promise: Resolvable = new Promise(r => (resolve = r)) as Resolvable;
Object.assign(promise, { resolve: resolve! });
return promise;
}
// Max time in ms that each execution round blocks the main thread.
const LIMIT = 100;
// A task in the tasks queue, it is usually the resolve cb of a promise,
// returned by waitFn().
type Task = () => void;
// The task queue.
const tasks: Task[] = [];
let started = 0;
// Executes a set of tasks from the queue for about 100ms.
function tick() {
started = Date.now();
while (tasks.length) {
tasks.shift()!();
if (Date.now() - started > LIMIT) {
return;
}
}
}
declare function setTimeout(cb: () => void, t: number): any;
// Starts the timer.
function startTimer() {
setTimeout(tick, 0);
}
// Pushes the given task in the tasks queue.
function enqueue(task: Task) {
if (tasks.length === 0) {
startTimer();
}
tasks.push(task);
}
function returnResolved() {
return tasks.length === 0 && Date.now() - started <= LIMIT;
}
// Implementation of the `WaitFn`.
function waitFn() {
if (returnResolved()) {
return Promise.resolve();
}
return new Promise<void>(resolve => {
enqueue(resolve);
});
}
namespace waitFn {
export async function* range(n: number): AsyncIterable<number> {
for (let i = 0; i < n; ++i) {
await waitFn();
yield i;
}
}
export async function* iter<T>(iterable: Iterable<T>): AsyncIterable<T> {
for (const data of iterable) {
yield data;
await waitFn();
}
}
}
// Implementation of the `Go` function.
export function go<Args extends any[] = [], R = void>(
fn: ConcurrentFunction<Args, R>,
...args: Args
): Promise<R> {
let resolve: (data: R) => void;
fn(waitFn, ...args).then(data => {
resolve(data);
});
return new Promise<R>(r => (resolve = r));
}
// Channel implementation.
class Channel<T> implements Chan<T> {
private static FINISH = Symbol('finish');
private instances: (T | typeof Channel.FINISH)[][] = [];
private newMessageNotify: Resolvable = createResolvable();
isClosed = false;
send(data: T): void {
if (this.isClosed) {
throw new Error('Cannot send new data to a closed channel.');
}
for (const instance of this.instances) {
instance.push(data);
}
this.newMessageNotify.resolve();
this.newMessageNotify = createResolvable();
}
close() {
this.isClosed = true;
this.newMessageNotify.resolve();
}
async *receive(): AsyncIterable<T> {
const instance: (T | typeof Channel.FINISH)[] = [];
this.instances.push(instance);
while (!this.isClosed || instance.length) {
await this.newMessageNotify;
if (instance.length) {
const data = instance.shift()!;
if (data === Channel.FINISH) {
return;
}
yield data as T;
}
}
}
async first(): Promise<T | undefined> {
const iter = this.receive();
const instance = this.instances[this.instances.length - 1];
for await (const x of iter) {
const instanceIndex = this.instances.indexOf(instance);
if (instanceIndex >= 0) {
instance[0] = Channel.FINISH;
this.instances.splice(instanceIndex, 1);
}
return x;
}
}
}
export function chan<T>(): Chan<T> {
return new Channel();
}
// Lock implementation.
class ZLock implements Lock {
private isLocked = false;
private lockChangeNotify: Resolvable = createResolvable();
retainSync(): LockReleaseCb | undefined {
if (!this.isLocked) {
this.isLocked = true;
return once(() => {
this.lockChangeNotify.resolve();
this.lockChangeNotify = createResolvable();
this.isLocked = false;
});
}
}
async retain(): Promise<LockReleaseCb> {
while (true) {
const release = this.retainSync();
if (release) {
return release;
}
await this.lockChangeNotify;
}
}
}
export function lock(): Lock {
return new ZLock();
}
}
/* tslint:enable:no-namespace */
export const Go = Concurrency.go;
export const Chan = Concurrency.chan;
export const Lock = Concurrency.lock;
export function once<A extends any[], R>(
fn: (...args: A) => R
): (...args: A) => R {
let called = false;
return (...args): R => {
if (called) {
throw new Error('This function can only be called once.');
}
called = true;
return fn(...args);
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment