Skip to content

Instantly share code, notes, and snippets.

@jhaynie
Created November 2, 2022 13:49
Show Gist options
  • Save jhaynie/a2a5d7efb1c4507a06f0e66544ce9ba8 to your computer and use it in GitHub Desktop.
Save jhaynie/a2a5d7efb1c4507a06f0e66544ce9ba8 to your computer and use it in GitHub Desktop.
Terrible hack to get Prisma to work with Cockroach transaction retries
import { AsyncLocalStorage } from 'async_hooks';
import { Prisma, PrismaClient } from '@prisma/client';
const asyncLocalStorage = new AsyncLocalStorage();
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
const maxRetryCount = 5;
const retryableErrorCodes = ['P1001', 'P2028', 'P2010', 'P2034', '40001'];
const isRetryable = (code: string) => retryableErrorCodes.includes(code);
const backoffMs = 100;
const backoff = (c: number) => Math.pow(2, c) * (backoffMs * Math.random()) + 50;
function transactionProxy(thing: any, transaction: any): any {
return new Proxy(thing, {
get: (target: any, prop: any) => {
if (typeof target[prop] === 'function') {
return (...args: any[]) => {
if (prop === 'then') return target[prop](args[0], args[1], transaction);
if (prop === 'catch') return target[prop](args[0], transaction);
if (prop === 'finally') return target[prop](args[0], transaction);
return transactionProxy(target[prop](...args), transaction);
};
}
return transactionProxy(target[prop], transaction);
},
});
}
const debugTestFailure = false; // set to true to inject a failure
const emptyObject = {};
const runTransaction = async (
prisma: any,
engine: any,
options: any,
runner: (tx: Prisma.TransactionClient) => Promise<any>,
) => {
const info = await engine.transaction('start', emptyObject, options);
const tx = transactionProxy(prisma, { id: info.id });
await tx.$executeRawUnsafe('SAVEPOINT cockroach_restart');
for (let c = 0; c < maxRetryCount; c++) {
try {
if (debugTestFailure || options?.injectFailure) {
if (c < 1) {
await tx.$executeRawUnsafe("SET inject_retry_errors_enabled = 'true'");
} else {
await tx.$executeRawUnsafe("SET inject_retry_errors_enabled = 'false'");
}
}
const res = await runner(tx);
await tx.$executeRawUnsafe('RELEASE SAVEPOINT cockroach_restart');
await engine.transaction('commit', emptyObject, info);
return res;
} catch (ex: any) {
if (ex.code === 'P2034') {
await tx.$executeRawUnsafe('ROLLBACK TO SAVEPOINT cockroach_restart');
const delay = backoff(c);
await sleep(delay);
continue;
}
await engine.transaction('rollback', emptyObject, info).catch(() => null);
throw ex;
}
}
};
type Fn = (...args: any[]) => Promise<any>;
const retryFn = (method: string, fn: Fn, debug = true) => {
return async (...args: any[]) => {
const begin = Date.now();
for (let c = 0; c < maxRetryCount; c++) {
try {
const started = Date.now();
const res = await fn(...args);
return res;
} catch (ex: any) {
if (!isRetryable(ex.code)) {
throw ex;
}
if (ex.meta?.code === '0A000') {
throw ex;
}
const delay = backoff(c);
await sleep(delay);
}
}
throw new SQLRetryTimeoutException(args, Date.now() - begin);
};
};
const wrappedMethods = [
'$executeRawUnsafe',
'create',
'createMany',
'update',
'updateMany',
'upsert',
'delete',
'deleteMany',
];
const createTableProxy = (target: any, debug = false): any => {
return new Proxy(target, {
get: (_target: any, method: string) => {
// check to see if inside a transaction and a method that needs to be retried and if so,
// return a wrapper function that will control the retry
if (wrappedMethods.includes(method) && asyncLocalStorage.getStore() === undefined) {
return retryFn(method, _target[method].bind(_target), debug);
}
// either inside a transaction or not a wrapped method which means we run the original fn
return target[method];
},
});
};
export const createPrismaClient = async (optionsArg?: any) => {
const prisma = new PrismaClient(optionsArg);
const _prisma = prisma as any;
await prisma.$connect();
const debug = !!optionsArg?.log?.includes('query') || !!process.env.SM_PRISMA_DEBUG;
// make a list of model properties we can wrap
const keys = Object.keys(Prisma.ModelName)
.map((x) => x.charAt(0).toLowerCase() + x.substring(1))
.filter((key) => !key.startsWith('_'));
// wrap the main proxy to capture methods off the main instance
const proxy = createTableProxy(prisma, debug);
keys.forEach((prop) => {
proxy[prop] = createTableProxy(proxy[prop], debug);
});
useMiddleware(proxy);
_prisma.$transaction = (args: any, options: any) => {
if (typeof args === 'function') {
return new Promise<any>((resolve, reject) => {
asyncLocalStorage.run(true, () => {
runTransaction(prisma, _prisma._engine, options, args).then(resolve).catch(reject);
});
});
}
throw new Error('$transaction batch unsupported. use interactive transaction instead');
};
return proxy;
};
export class SQLRetryTimeoutException extends Error {
public params: any;
constructor(params: any, duration: number) {
super(`SQL Retry Timeout Exception after ${duration}ms`);
this.params = params;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment