Skip to content

Instantly share code, notes, and snippets.

@gramcha
Last active April 12, 2022 14:32
Show Gist options
  • Save gramcha/1a1dbcb7733c77f87ad5bee7dd2356f9 to your computer and use it in GitHub Desktop.
Save gramcha/1a1dbcb7733c77f87ad5bee7dd2356f9 to your computer and use it in GitHub Desktop.
redis - node-redlock - poc
/* global logger */
const ioredis = require('ioredis');
const { default: Redlock, ResourceLockedError } = require("redlock");
if (!global.logger) {
global.logger = console;
}
class DistributedLockService {
static getIORedisInstance(redisConfig) {
return new ioredis({
port: redisConfig.port,
host: redisConfig.host,
maxRetriesPerRequest: null,
enableReadyCheck: false
});
}
static getRedLockInstance(redisInstance) {
return new Redlock(
[redisInstance],
{
// The expected clock drift; for more details see:
// http://redis.io/topics/distlock
driftFactor: 0.01, // multiplied by lock ttl to determine drift time
// The max number of times Redlock will attempt to lock a resource
// before erroring.
retryCount: 20,
// the time in ms between attempts
retryDelay: 2000, // time in ms
// the max time in ms randomly added to retries
// to improve performance under high contention
// see https://www.awsarchitectureblog.com/2015/03/backoff.html
retryJitter: 200, // time in ms
// The minimum remaining time on a lock before an extension is automatically
// attempted with the `using` API.
automaticExtensionThreshold: 500, // time in ms
}
);
}
static logError(error) {
// Ignore cases where a resource is explicitly marked as locked on a client.
if (error instanceof ResourceLockedError) {
return;
}
// Log all other errors.
logger.error(error);
}
static init(redisConfig) {
if(!DistributedLockService.prototype.redLockInstance) {
const redisInstance = DistributedLockService.getIORedisInstance(redisConfig);
const redLockInstance = DistributedLockService.getRedLockInstance(redisInstance);
DistributedLockService.prototype.redLockInstance = redLockInstance;
DistributedLockService.prototype.redLockInstance.on("error", DistributedLockService.logError);
Object.freeze(DistributedLockService.prototype.redLockInstance);
}
return DistributedLockService.prototype.redLockInstance;
}
static async lockFor(resources, durationInMs, criticalSectionCallback) {
const redLockInstance = DistributedLockService.prototype.redLockInstance;
await redLockInstance.using(resources, durationInMs,criticalSectionCallback);
}
}
// module.exports = DistributedLockService;
function criticalSection(instance) {
return async (signal) => {
// Do something...
console.log(`acquired sob for ${instance} - start - ${(new Date()).toJSON()}`);
await new Promise((resolve) => setTimeout(() => resolve(), 1000));
// Make sure any attempted lock extension has not failed.
if (signal.aborted) {
throw signal.error;
}
// Do something else...
console.log(`acquired sob for ${instance} - end - ${(new Date()).toJSON()}`);
};
}
async function run() {
DistributedLockService.init({ host: '127.0.0.1', port: 6379 });
await Promise.all([
DistributedLockService.lockFor(['A', 'B'], 20000, criticalSection("A1B1")),
DistributedLockService.lockFor(['B', 'A'], 20000, criticalSection("A2B2")),
DistributedLockService.lockFor(['B', 'A', 'C'], 20000, criticalSection("A3B3C1")),
DistributedLockService.lockFor(['A'], 20000, criticalSection("A")),
DistributedLockService.lockFor(['B'], 20000, criticalSection("B")),
DistributedLockService.lockFor(['D'], 20000, criticalSection("D")),
]);
}
run().then(()=>{
console.log('over');
process.exit(0);
}).catch((error)=>{
console.error(error);
process.exit(1);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment