Created
October 25, 2021 07:20
-
-
Save demipixel/706846ac107c14836d4aba231091b66e to your computer and use it in GitHub Desktop.
Redis Memolock, implemented in TypeScript (with example usage)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Redis from 'ioredis'; | |
const { HOST: host, PORT: port } = process.env; | |
const redis = new Redis({ host, port: parseInt(port || '6379', 10) }); | |
const subRedis = new Redis({ host, port: parseInt(port || '6379', 10) }); | |
// Write a class that checks a redis key for a cache. | |
// If the source is not locked, lock it and fetch it. | |
// If the source is already locked, wait for the lock to expire. | |
// If the lock expires, lock it and fetch it. | |
interface ExtraMemLockOpt { | |
keyTimeout?: number; | |
lockTimeout?: number; | |
} | |
class RedisMemLockCache { | |
keyTimeout: number; | |
lockTimeout: number; | |
constructor(opt: ExtraMemLockOpt) { | |
this.keyTimeout = opt.keyTimeout || 60 * 1000; | |
this.lockTimeout = opt.lockTimeout || 1000; | |
} | |
async get( | |
key: string, | |
fetch: () => Promise<string>, | |
attempts = 0, | |
): Promise<string> { | |
const value = await redis.get(key); | |
if (value) { | |
return value; | |
} else { | |
return this.getLockOrWaitForLock(key, fetch, attempts); | |
} | |
} | |
async getLockOrWaitForLock( | |
key: string, | |
fetch: () => Promise<string>, | |
attempts: number, | |
): Promise<string> { | |
const keyLock = `${key}:lock`; | |
const isLocked = | |
(await redis.set(keyLock, 'locked', 'PX', this.lockTimeout, 'NX')) !== | |
'OK'; | |
const keyChannel = `${key}_done`; | |
if (isLocked) { | |
// Subscribe to event to wait for the value | |
return new Promise((resolve, reject) => { | |
const listener = (channel: string, message: string) => { | |
if (channel === key) { | |
clearTimeout(noMessageTimeout); | |
subRedis.unsubscribe(keyChannel); | |
subRedis.off('message', listener); | |
resolve(message); | |
} | |
}; | |
subRedis.on('message', listener); | |
subRedis.subscribe(key).catch(reject); | |
// Try again recursively if we've been waiting too long | |
const noMessageTimeout = setTimeout(() => { | |
subRedis.unsubscribe(keyChannel); | |
if (attempts < 3) { | |
resolve(this.get(key, fetch, attempts + 1)); | |
} else { | |
reject(new Error('Never received message that key was unlocked')); | |
} | |
}, this.lockTimeout); | |
}); | |
} else { | |
// Fetch value | |
const value = await fetch(); | |
// Set value in cache | |
await redis.set(key, value, 'PX', this.keyTimeout); | |
// Release lock | |
await redis.del(keyLock); | |
// Publish value | |
await redis.publish(key, value); | |
return value; | |
} | |
} | |
} | |
///////////////////////////////////// | |
// Example usage: | |
let fetches = 0; | |
function longFetch() { | |
fetches++; | |
return new Promise<string>((res, rej) => { | |
setTimeout(() => { | |
res('Hello ' + fetches); | |
}, 500); | |
}); | |
} | |
const cache = new RedisMemLockCache({ keyTimeout: 2000, lockTimeout: 1000 }); | |
const key = 'my-key'; | |
const first = cache.get(key, longFetch); | |
const second = cache.get(key, longFetch); | |
Promise.all([first, second]).then((values) => { | |
console.log(values); // should be ['Hello 1', 'Hello 1'] | |
}); | |
setTimeout(async () => { | |
const third = await cache.get(key, longFetch); | |
console.log(third); // should be 'Hello 1' | |
}, 1500); | |
setTimeout(async () => { | |
const fourth = await cache.get(key, longFetch); | |
console.log(fourth); // should be 'Hello 2' | |
console.log(subRedis.listenerCount('message')); // should be 0 | |
redis.disconnect(); | |
subRedis.disconnect(); | |
}, 4000); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment