Skip to content

Instantly share code, notes, and snippets.

@kriskowal
Last active January 19, 2021 06:03
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kriskowal/abcba770b5dba526a1af801d692fd94e to your computer and use it in GitHub Desktop.
Save kriskowal/abcba770b5dba526a1af801d692fd94e to your computer and use it in GitHub Desktop.
function *count(n) {
for (let i = 0; i < n; i++) {
yield i;
}
}
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));
const asyncForEach = async (values, callback) => {
for await (const value of values) {
await callback(value);
}
};
const demoAsyncForEach = async () => {
console.log('demo serial async for each');
await asyncForEach(count(10), async (n) => {
await delay(Math.random() * 100);
console.log(n);
});
};
const parallel = (limit, callback) => {
function *workers() {
for (const worker of count(limit)) {
yield callback(worker);
}
}
return Promise.all(workers());
};
const parallelForEach = async (limit, values, callback) => {
return parallel(limit, () => asyncForEach(values, callback));
};
const demoParallelForEach = async () => {
console.log('demo parallel async for each');
await parallelForEach(5, count(20), async (n) => {
await delay(Math.random() * 100);
console.log(n);
});
};
const asyncReduce = async (zero, values, callback) => {
for await (const value of values) {
zero = await callback(zero, value);
}
return zero;
};
const parallelReduce = async (limit, zero, values, callback) => {
values = await parallel(limit, () => asyncReduce(zero, values, callback));
return asyncReduce(zero, values, callback);
};
const demoParallelReduce = async () => {
console.log('demo parallel reduce');
const sum = await parallelReduce(10, 0, count(10), async (a, b) => {
if (a == 0) {
return b;
}
await delay(Math.random() * 100);
console.log(a, '+', b, '=', a + b);
return a + b;
});
console.log(sum);
};
const defer = () => {
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
};
const queue = () => {
const ends = defer();
return {
put(value) {
const next = defer();
const promise = next.promise;
ends.resolve({ value, promise });
ends.resolve = next.resolve;
},
get() {
const promise = ends.promise.then(next => next.value);
ends.promise = ends.promise.then(next => next.promise);
return promise;
},
};
};
const stream = (up, down) => ({
next(value) {
up.put({ value, done: false });
return down.get();
},
return(value) {
up.put({ value, done: true });
return down.get();
},
throw(error) {
up.put(Promise.reject(error));
return down.get();
},
[Symbol.asyncIterator]() {
return this;
},
});
const pipe = () => {
const syn = queue();
const ack = queue();
const input = stream(syn, ack);
const output = stream(ack, syn);
return [input, output];
};
const demoPipe = async () => {
console.log('demo pipe');
const producer = async (output) => {
for (token of count(10)) {
console.log(token, '->');
await output.next(token);
}
output.return();
};
const consumer = async (input) => {
for await (token of input) {
console.log('->', token);
await delay(Math.random() * 100);
}
};
const [input, output] = pipe();
await Promise.all([
producer(output),
consumer(input),
]);
};
// pump pulls from one stream and pushes to another.
// The pump slows down for output back-pressure.
const pump = async (output, input) => {
try {
let value, done;
while ({value, done} = await input.next()) {
if (done) {
return output.return(value);
}
await output.next(value);
}
} catch (error) {
return output.throw(error);
}
};
const demoPump = async () => {
console.log('demo pump');
async function *producer() {
for (token of count(10)) {
console.log(token, '->');
await(yield token);
}
};
const consumer = async (input) => {
for await (token of input) {
console.log('->', token);
await delay(Math.random() * 100);
}
};
const [input, output] = pipe();
await Promise.all([
pump(output, producer()),
consumer(input),
]);
};
async function *asyncFlatten(streams) {
for await (const stream of streams) {
for await (const value of stream) {
yield value;
}
}
}
const demoAsyncFlatten = async () => {
console.log('demo async flatten');
for await (const value of asyncFlatten([count(3), count(3)])) {
console.log(value);
}
};
async function *asyncMap(values, callback) {
for await (const value of values) {
await(yield await callback(value));
}
}
const demoAsyncFlattenMap = async () => {
console.log('demo async flatten');
const streams = asyncMap(count(3), () => count(3));
for await (const value of asyncFlatten(streams)) {
console.log(value);
}
};
const parallelMap = (limit, values, callback) => {
const [input, output] = pipe();
parallel(limit, () => pump(input, asyncMap(values, callback)));
return output;
}
const demoParallelMap = async () => {
console.log('demo parallel map');
const streams = parallelMap(7, count(3), async n => {
await delay(Math.random() * 100);
return asyncMap(count(5), async m => {
await delay(Math.random() * 100);
return (m+1) * 10 + n;
})
});
for await (const value of asyncFlatten(streams)) {
console.log(value);
}
};
const delayWithContext = (context, ms) => {
const { promise, resolve, reject } = defer();
let handle = setTimeout(resolve, ms);
context.cancelled.catch((error) => {
reject(error);
clearTimeout(handle);
});
return promise;
};
const never = defer().promise;
const background = Object.freeze({
cancelled: never,
deadline: Infinity,
with(child) {
return Object.freeze({
__proto__: this,
...child,
});
},
withTimeout(timeout) {
const deadline = Date.now() + timeout;
return this.withTimeoutAndDeadline(timeout, deadline);
},
withDeadline(deadline) {
const timeout = deadline - Date.now();
return this.withTimeoutAndDeadline(timeout, deadline);
},
withTimeoutAndDeadline(timeout, deadline) {
if (deadline > this.deadline) {
return this;
}
const { cancel, context } = this.withCancel();
delayWithContext(this, timeout).then(() => cancel(new Error(`Timed out`)));
return context.with({ deadline });
},
withCancel() {
const { promise, reject } = defer();
const context = this.with({ cancelled: promise });
this.cancelled.catch(reject);
return {cancel: reject, context};
},
});
const streamWithContext = (context, stream) => ({
next(value) {
return Promise.race([context.cancelled, stream.next(value)]);
},
return(value) {
return Promise.race([context.cancelled, stream.return(value)]);
},
throw(error) {
return Promise.race([context.cancelled, stream.throw(error)]);
},
[Symbol.asyncIterator]() {
return this;
},
});
const demoStreamTimeout = async () => {
console.log('streaming with a timeout');
const context = background.withTimeout(1000);
const stream = streamWithContext(context, count(1000));
try {
await parallelForEach(10, stream, async (n) => {
await delayWithContext(context, Math.random() * 1000);
console.log(n);
});
} catch (error) {
console.log(error.message);
}
};
(async () => {
await demoAsyncForEach();
await demoParallelForEach();
await demoParallelReduce();
await demoAsyncFlatten();
await demoAsyncFlattenMap();
await demoParallelMap();
await demoPipe();
await demoPump();
await demoStreamTimeout();
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment