Skip to content

Instantly share code, notes, and snippets.

@jacwright
Last active December 13, 2021 03:28
Show Gist options
  • Save jacwright/3f36557023a92e0d1f41abf53a87de7e to your computer and use it in GitHub Desktop.
Save jacwright/3f36557023a92e0d1f41abf53a87de7e to your computer and use it in GitHub Desktop.
Concurrency: Easy, Fast, Correct — Choose three
/**
* Simplify concurrency using the strategies described in
* https://blog.cloudflare.com/durable-objects-easy-fast-correct-choose-three/ using input gates and output gates by
* allowing certain actions to block other actions or to block the responses of other actions.
* To "block" an action or response means to defer it until after the blocking actions have been processed. This
* allows the state of an object to be consistent and avoid race conditions which may be hard to discover.
* It is a good idea to use this in conjunction with a cache for blockable actions (e.g. a storage mechanism) to ensure
* the blocking doesn't slow down the system.
*
* Definitions:
* - "blockable" methods won't execute until after being unblocked. They will be deferred to run afterwards.
* - "blockableResponse" methods will execute but won't deliver a response until after being unblocked.
* - "blocking" methods will block blockable calls and responses.
*
* In the article linked above, `this.storage.get` and `this.storage.put` are blocking, `fetch` has a blockable
* response, and `this.getUniqueNumber()` is a blockable method.
*
* Note that blockable methods also have blockable responses. If you return a response while a storage operation is in
* progress, the response will return after the storage operation completes. If the storage operation fails, the
* response will be the error.
*
* Usage - with decorators (recommended):
*
* ```ts
* import { simplifiedConcurrency } from 'simplified-concurrency';
*
* const { blockable, blocking, blockableResponse } = simplifiedConcurrency();
*
* class MyDurableObject {
* storage: Storage;
*
* constructor() {
* this.storage = new Storage();
* }
*
* @blockable
* async getUniqueNumber() {
* await this.fetch("https://example.com/api1");
* let val = await this.storage.get("counter");
* this.storage.put("counter", val + 1);
* return val;
* }
*
* @blockableResponse
* async fetch(input: RequestInfo, init?: RequestInit): Promise<Response> {
* return fetch(input, init);
* }
* }
*
* // Simple storage with cache
* class Storage {
* cache: Map<string, any>;
*
* constructor() {
* this.cache = new Map();
* }
*
* @blocking
* async get(key) {
* if (this.cache.has(key)) {
* return this.cache.get(key);
* } else {
* // if fetch is used in a blocking method, be sure to use the global fetch and not the blockable-response fetch
* // provided which would never return since it is blocked by the method calling it.
* const response = await fetch(BACKEND_URL + key);
* return await response.json();
* }
* }
*
* @blocking
* async put(key, value) {
* this.cache.set(key, value);
*
* const response = await fetch(BACKEND_URL + key, {
* method: 'POST',
* headers: { 'Content-Type': 'application/json' }},
* body: JSON.stringify(value)
* });
*
* return await response.json();
* }
* }
* ```
*
* These APIs work in the browser and in Node.js as they are just plain JavaScript promises.
*/
export function simplifiedConcurrency() {
const blockingCalls = new Set<number>();
const callsJustBlocked = new Map<number, Promise<any>>();
const deferred: Function[] = [];
let call = 0;
function reset() {
// Reset for testing
blockingCalls.clear();
callsJustBlocked.clear();
deferred.length = 0;
call = 0;
}
/**
* Wrap a function which is blockable.
* Also, a Typescript decorator for functions which are blockable.
* Examples:
*
* Wrapped function:
* ```ts
* // function
* const increment = blockable(async (amount: number) => {
* const value = await getFromStorage('value'); // blocking call
* setToStorage('value', value + amount); // blocking call
* return value + amount; // won't return until blocking is finished
* });
*
* // class
* class Controller {
* constructor() {
* this.increment = blockable(this.increment);
* }
*
* async increment(amount: number) {
* const value = await this.storage.get('value'); // blocking call
* this.storage.set('value', value + amount); // blocking call
* return value + amount; // won't return until blocking is finished
* }
* }
* ```
*
* Decorator:
* ```ts
* class Controller {
*
* @blockable
* async increment(amount: number) {
* const value = await this.storage.get('value'); // blocking call
* this.storage.set('value', value + amount); // blocking call
* return value + amount; // won't return until blocking is finished
* }
* }
* ```
*/
function blockable<T extends Function>(target: T): T;
function blockable(target: any, propertyKey: string, descriptor: PropertyDescriptor): void;
function blockable(target: any, propertyKey?: string, descriptor?: PropertyDescriptor) {
return wrap(target, descriptor, func =>
function(...args: any[]) {
if (!blockingCalls.size) {
return func.apply(this, args).then(outputGate);
} else {
return new Promise((resolve, reject) => {
deferred.push(() => func.apply(this, args).then(outputGate).then(resolve, reject));
});
}
}
);
}
/**
* Wrap a function which returns a response which is blockable (e.g. fetch).
* Also, a Typescript decorator for functions whose response should be blocked when needed.
* Example:
* ```ts
* class RemoteAPI {
* @blockableResponse
* async sendEmail(options: EmailOptions) {
* ...
* }
* }
* ```
*/
function blockableResponse<T extends Function>(target: T): T;
function blockableResponse(target: any, propertyKey: string, descriptor: PropertyDescriptor): void;
function blockableResponse(target: any, propertyKey?: string, descriptor?: PropertyDescriptor) {
return wrap(target, descriptor, func =>
function(...args: any[]) {
return deferResponse(func.apply(this, args));
}
);
}
/**
* Wrap a function which blocks.
* Also, a Typescript decorator for functions which block.
* Examples:
*
* Wrapped function:
* ```ts
* // function
* const getFromStorage = blocking(async (key: string) => {
* ...
* });
*
* // class
* class Storage {
* constructor() {
* this.get = blocking(this.get);
* }
*
* async get(key: string) {
* ...
* }
* }
* ```
*
* Decorator:
* ```ts
* class Storage {
*
* @blocking
* async get(key: string) {
* ...
* }
* }
* ```
*/
function blocking<T extends Function>(target: T): T;
function blocking(target: any, propertyKey: string, descriptor: PropertyDescriptor): void;
function blocking(target: any, propertyKey?: string, descriptor?: PropertyDescriptor) {
return wrap(target, descriptor, func =>
function(...args: any[]) {
return blockFor(func.apply(this, args));
}
);
}
/**
* Block while waiting for the callback to resolve.
*/
async function blockWhile(callback: Function): Promise<any> {
return blockFor(callback());
}
/**
* Defers the response of this call if a blocking call is in progress.
*/
function deferResponse<T>(promise: Promise<T>): Promise<T> {
return promise.then(onFulfilled, onRejected);
}
async function blockFor<T>(promise: Promise<T>) {
const thisCall = ++call;
blockingCalls.add(thisCall);
callsJustBlocked.set(thisCall, promise);
afterAll().then(() => callsJustBlocked.delete(thisCall));
function finish() {
if (!blockingCalls.has(thisCall)) return;
blockingCalls.delete(thisCall);
callsJustBlocked.delete(thisCall);
if (!blockingCalls.size) afterAll().then(process);
}
let response: any;
try {
response = await promise;
} catch (e) {
finish();
throw e;
}
finish();
return response;
}
function onFulfilled(result: any) {
if (blockingCalls.size) {
return new Promise(resolve => deferred.push(() => resolve(result)));
} else {
return result;
}
}
function onRejected(reason: any) {
if (blockingCalls.size) {
return new Promise((resolve, reject) => deferred.push(() => reject(reason)));
} else {
return Promise.reject(reason);
}
}
// Process deferred calls and responses, pausing when blocked again.
function process() {
while (!blockingCalls.size && deferred.length) {
deferred.shift()();
}
}
function wrap(target: any, descriptor: PropertyDescriptor, wrapper: (func: Function) => Function): any {
const origFunc = descriptor && descriptor.value || target;
if (typeof origFunc !== 'function') throw new TypeError('Blocking method wrappers can only be used on functions');
const func = wrapper(origFunc);
if (descriptor) {
descriptor.value = func;
} else {
return func;
}
}
// Don't let a blockable function return its result until all blocking calls it initiated are finished.
function outputGate(result: any) {
if (callsJustBlocked.size) {
const promises = Array.from(callsJustBlocked.values());
callsJustBlocked.forEach((promise, call) => blockingCalls.delete(call));
callsJustBlocked.clear();
afterAll().then(process);
return Promise.all(promises).then(() => result);
} else {
return result;
}
}
return {
blockable,
blockableResponse,
blocking,
fetch,
blockWhile,
deferResponse,
reset,
}
}
function tick() {
return Promise.resolve();
}
async function afterAll() {
for (let i = 0; i < 10; i++) await tick();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment