Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Redis Memolock, implemented in TypeScript (with example usage)
import Redis from 'ioredis';
const { HOST: host, PORT: port } = process.env;
const redis = new Redis({ host, port: parseInt(port || '6379', 10) });
const subRedis = new Redis({ host, port: parseInt(port || '6379', 10) });
// Write a class that checks a redis key for a cache.
// If the source is not locked, lock it and fetch it.
// If the source is already locked, wait for the lock to expire.
// If the lock expires, lock it and fetch it.
interface ExtraMemLockOpt {
keyTimeout?: number;
lockTimeout?: number;
}
class RedisMemLockCache {
keyTimeout: number;
lockTimeout: number;
constructor(opt: ExtraMemLockOpt) {
this.keyTimeout = opt.keyTimeout || 60 * 1000;
this.lockTimeout = opt.lockTimeout || 1000;
}
async get(
key: string,
fetch: () => Promise<string>,
attempts = 0,
): Promise<string> {
const value = await redis.get(key);
if (value) {
return value;
} else {
return this.getLockOrWaitForLock(key, fetch, attempts);
}
}
async getLockOrWaitForLock(
key: string,
fetch: () => Promise<string>,
attempts: number,
): Promise<string> {
const keyLock = `${key}:lock`;
const isLocked =
(await redis.set(keyLock, 'locked', 'PX', this.lockTimeout, 'NX')) !==
'OK';
const keyChannel = `${key}_done`;
if (isLocked) {
// Subscribe to event to wait for the value
return new Promise((resolve, reject) => {
const listener = (channel: string, message: string) => {
if (channel === key) {
clearTimeout(noMessageTimeout);
subRedis.unsubscribe(keyChannel);
subRedis.off('message', listener);
resolve(message);
}
};
subRedis.on('message', listener);
subRedis.subscribe(key).catch(reject);
// Try again recursively if we've been waiting too long
const noMessageTimeout = setTimeout(() => {
subRedis.unsubscribe(keyChannel);
if (attempts < 3) {
resolve(this.get(key, fetch, attempts + 1));
} else {
reject(new Error('Never received message that key was unlocked'));
}
}, this.lockTimeout);
});
} else {
// Fetch value
const value = await fetch();
// Set value in cache
await redis.set(key, value, 'PX', this.keyTimeout);
// Release lock
await redis.del(keyLock);
// Publish value
await redis.publish(key, value);
return value;
}
}
}
/////////////////////////////////////
// Example usage:
let fetches = 0;
function longFetch() {
fetches++;
return new Promise<string>((res, rej) => {
setTimeout(() => {
res('Hello ' + fetches);
}, 500);
});
}
const cache = new RedisMemLockCache({ keyTimeout: 2000, lockTimeout: 1000 });
const key = 'my-key';
const first = cache.get(key, longFetch);
const second = cache.get(key, longFetch);
Promise.all([first, second]).then((values) => {
console.log(values); // should be ['Hello 1', 'Hello 1']
});
setTimeout(async () => {
const third = await cache.get(key, longFetch);
console.log(third); // should be 'Hello 1'
}, 1500);
setTimeout(async () => {
const fourth = await cache.get(key, longFetch);
console.log(fourth); // should be 'Hello 2'
console.log(subRedis.listenerCount('message')); // should be 0
redis.disconnect();
subRedis.disconnect();
}, 4000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment