Skip to content

Instantly share code, notes, and snippets.

@akramarev
Last active June 2, 2022 03:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akramarev/c6d4b747d5c3a8c255ac99c7638238a6 to your computer and use it in GitHub Desktop.
Save akramarev/c6d4b747d5c3a8c255ac99c7638238a6 to your computer and use it in GitHub Desktop.
[Distributed Data Loader] GraphQL data loaders enhanced by distributed cache #AmplitudeEngineeringHandbook #public
import DataLoader from 'dataloader';
import logger from '../../../logger';
export const REDIS_KEY_DELIMITER = '\x1F'; // ASCII unit separator (31 in hex)
export const REDIS_NULL_VALUE = '\x00'; // ASCII NULL (0 in hex), value is set but it's null
export type RedisNullValueType = '\x00';
export type DistributedDataLoaderOptions<K, V> = DataLoader.Options<
K,
V,
string
> & {
expiration?: number;
};
export type OptionsEx<K, V> = DataLoader.Options<K, V, string> & {
expiration: number;
cacheKeyFn: (key: K) => string;
};
export abstract class DistributedDataLoader<K, V>
implements DataLoader<K, V, string>
{
protected readonly defaultExpiration = 60 * 60 * 24; // 1 day
protected readonly options: OptionsEx<K, V>;
protected readonly loader: DataLoader<K, V>;
protected constructor(
protected keySpace: string,
protected batchLoadFn: DataLoader.BatchLoadFn<K, V>,
options?: DistributedDataLoaderOptions<K, V>,
) {
this.options = {
...options,
expiration: options?.expiration || this.defaultExpiration,
cacheKeyFn: options?.cacheKeyFn || ((key) => JSON.stringify(key)),
};
// create a new loader with augmented batchLoadFn
this.loader = new DataLoader(
this.wrapBatchLoadFn(this.batchLoadFn),
this.options,
);
}
prime(key: K, value: Error | V): this {
if (value instanceof Error) {
this.loader.clear(key).prime(key, value);
return this;
}
// eslint-disable-next-line promise/prefer-await-to-then
this.rSet([[key, value]]).catch((err: Error) => logger.error(err));
this.loader.clear(key).prime(key, value);
return this;
}
clear(key: K): this {
// eslint-disable-next-line promise/prefer-await-to-then
this.rDel([key]).catch((err: Error) => logger.error(err));
this.loader.clear(key);
return this;
}
clearAll(): this {
// eslint-disable-next-line promise/prefer-await-to-then
this.rDelAll().catch((err: Error) => logger.error(err));
this.loader.clearAll();
return this;
}
async load(key: K): Promise<V> {
return this.loader.load(key);
}
async loadMany(keys: ArrayLike<K>): Promise<Array<Error | V>> {
return this.loader.loadMany(keys);
}
protected wrapBatchLoadFn(
batchLoadFn: DataLoader.BatchLoadFn<K, V>,
): DataLoader.BatchLoadFn<K, V> {
return async (keys: ReadonlyArray<K>): Promise<Array<V | Error>> => {
const result = new Array<V | Error>();
// query distributed cache first
const rRecords = await this.rGet(keys);
// check if there are cache misses
const cacheMisses = rRecords.reduce(
(acc, rRecord, idx) => {
if (rRecord === null) {
acc.keys.push(keys[idx]);
acc.positions.push(idx);
}
result[idx] = (rRecord === REDIS_NULL_VALUE ? null : rRecord) as V;
return acc;
},
{
keys: new Array<K>(),
positions: new Array<number>(),
},
);
if (cacheMisses.keys.length === 0) {
return rRecords as Array<V>;
}
// query primary data source
const dbRecords = await batchLoadFn(cacheMisses.keys);
// populate cache (skip errors)
await this.rSet(
Array.from(dbRecords)
.filter((r): r is V => !(r instanceof Error))
.map((record, idx) => [cacheMisses.keys[idx], record]),
);
// fill the gaps and return complete result
return cacheMisses.positions.reduce((acc, pos, idx) => {
result[pos] = dbRecords[idx];
return acc;
}, result);
};
}
protected rSerialize(value: V): string | RedisNullValueType {
if (value === null) {
return REDIS_NULL_VALUE;
}
return JSON.stringify(value);
}
protected rParse(value: string | null): V | null | RedisNullValueType {
if (value === null) {
return null;
} else if (value === REDIS_NULL_VALUE) {
return REDIS_NULL_VALUE;
}
return JSON.parse(value) as V;
}
protected abstract rGet(
keys: ReadonlyArray<K>,
): Promise<Array<V | null | RedisNullValueType>>;
protected abstract rSet(tuples: Array<[K, V]>): Promise<void>;
protected abstract rDel(keys: ReadonlyArray<K>): Promise<void>;
protected abstract rDelAll(): Promise<void>;
}
import { RedisIface } from 'amplitude/lib/redis';
import DataLoader from 'dataloader';
import {
DistributedDataLoader,
REDIS_KEY_DELIMITER,
DistributedDataLoaderOptions,
} from './DistributedDataLoader';
import { HSDataLoader } from './HSDataLoader';
import { KVDataLoader } from './KVDataLoader';
export class DistributedDataLoaderFactory {
constructor(private redis: RedisIface, private keySpacePrefix: string = '') {}
/**
* Creates a new distributed data loader backend by HashSet data structure.
* Use it when write/clear operations are frequent.
*/
public createHSDataLoader<K, V>(
keySpace: string,
batchLoadFn: DataLoader.BatchLoadFn<K, V>,
options?: DistributedDataLoaderOptions<K, V>,
): DistributedDataLoader<K, V> {
return new HSDataLoader(
this.redis,
[this.keySpacePrefix, keySpace, 'hs'].join(REDIS_KEY_DELIMITER),
batchLoadFn,
options,
);
}
/**
* Creates a new distributed data loader backend by HashSet data structure.
* Use it when load operations are frequent.
*/
public createKVDataLoader<K, V>(
keySpace: string,
batchLoadFn: DataLoader.BatchLoadFn<K, V>,
options?: DistributedDataLoaderOptions<K, V>,
): DistributedDataLoader<K, V> {
return new KVDataLoader(
this.redis,
[this.keySpacePrefix, keySpace, 'kv'].join(REDIS_KEY_DELIMITER),
batchLoadFn,
options,
);
}
}
import { RedisIface } from 'amplitude/lib/redis';
import DataLoader from 'dataloader';
import logger from '../../../logger';
import {
DistributedDataLoader,
DistributedDataLoaderOptions,
RedisNullValueType,
} from './DistributedDataLoader';
export class HSDataLoader<K, V> extends DistributedDataLoader<K, V> {
constructor(
private redis: RedisIface,
keySpace: string,
batchLoadFn: DataLoader.BatchLoadFn<K, V>,
options?: DistributedDataLoaderOptions<K, V>,
) {
super(keySpace, batchLoadFn, options);
}
protected async rGet(
keys: ReadonlyArray<K>,
): Promise<Array<V | null | RedisNullValueType>> {
return this.redis.runWithReconnect(async (redis) => {
const result = await redis.hmget(
this.keySpace,
...keys.map(this.options.cacheKeyFn),
);
return result.map(this.rParse);
});
}
protected async rSet(tuples: Array<[K, V]>): Promise<void> {
return this.redis.runWithReconnect(async (redis) => {
const result = await redis.hset(
this.keySpace,
tuples.reduce((acc, [key, value]) => {
acc.set(this.options.cacheKeyFn(key), this.rSerialize(value));
return acc;
}, new Map<string, string>()),
);
// expire command works only with keys
// TODO: when required, add manual fields expiration logic
if (result > 0) {
redis
.expire(this.keySpace, this.options.expiration)
// eslint-disable-next-line promise/prefer-await-to-then
.catch((err: Error) => logger.error(err));
}
});
}
protected async rDel(keys: ReadonlyArray<K>): Promise<void> {
return this.redis.runWithReconnect(async (redis) => {
await redis.hdel(this.keySpace, ...keys.map(this.options.cacheKeyFn));
});
}
protected async rDelAll(): Promise<void> {
return this.redis.runWithReconnect(async (redis) => {
await redis.del(this.keySpace);
});
}
}
import { RedisIface } from 'amplitude/lib/redis';
import DataLoader from 'dataloader';
import {
DistributedDataLoader,
REDIS_KEY_DELIMITER,
DistributedDataLoaderOptions,
RedisNullValueType,
} from './DistributedDataLoader';
export class KVDataLoader<K, V> extends DistributedDataLoader<K, V> {
protected keySpaceSet: string;
constructor(
private redis: RedisIface,
keySpace: string,
batchLoadFn: DataLoader.BatchLoadFn<K, V>,
options?: DistributedDataLoaderOptions<K, V>,
) {
super(keySpace, batchLoadFn, options);
// separate set name to store all correspondent keys
this.keySpaceSet = `${keySpace}-set`;
}
protected async rGet(
keys: ReadonlyArray<K>,
): Promise<Array<V | null | RedisNullValueType>> {
return this.redis.runWithReconnect(async (redis) => {
const result = await Promise.all(
keys.map(async (k) => {
return redis.get(
[this.keySpace, this.options.cacheKeyFn(k)].join(
REDIS_KEY_DELIMITER,
),
);
}),
);
return result.map(this.rParse);
});
}
protected async rSet(tuples: Array<[K, V]>): Promise<void> {
const keySet = new Set<string>();
return this.redis.runWithReconnect(async (redis) => {
// save key-values first
await Promise.all(
tuples.map(async ([k, v]) => {
const key = [this.keySpace, this.options.cacheKeyFn(k)].join(
REDIS_KEY_DELIMITER,
);
keySet.add(key);
return redis.setex(key, this.options.expiration, this.rSerialize(v));
}),
);
// put the keys into correspondent set (and always set new expiration)
await redis.sadd(this.keySpaceSet, ...keySet);
await redis.expire(this.keySpaceSet, this.options.expiration);
});
}
protected async rDel(keys: ReadonlyArray<K>): Promise<void> {
const keySet = new Set<string>();
return this.redis.runWithReconnect(async (redis) => {
await Promise.all(
keys.map(async (k) => {
const key = [this.keySpace, this.options.cacheKeyFn(k)].join(
REDIS_KEY_DELIMITER,
);
keySet.add(key);
return redis.del(key);
}),
);
await redis.srem(this.keySpaceSet, ...keySet);
});
}
protected async rDelAll(): Promise<void> {
return this.redis.runWithReconnect(async (redis) => {
const keys = await redis.smembers(this.keySpaceSet);
await Promise.all(
keys.map(async (k) => {
return redis.del(k);
}),
);
await redis.del(this.keySpaceSet);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment