Skip to content

Instantly share code, notes, and snippets.

@Dionid
Last active October 12, 2023 11:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Dionid/a72245ff95a82d4993c8a6cdb1cfe251 to your computer and use it in GitHub Desktop.
Save Dionid/a72245ff95a82d4993c8a6cdb1cfe251 to your computer and use it in GitHub Desktop.
Consensus on Redis + TypeScript
import { Sleep } from "sleep";
import IORedis from "ioredis";
import pino from "pino";
import {
LEADERSHIP_EVENTS,
LeadershipMetrics,
} from "libs/metrics/leadership";
const leadershipKey = "leader";
export const leadership = async (
abortController: AbortController,
redis: IORedis,
instanceId: string,
onLeader: (instanceId: string) => Promise<void>
) => {
await redis.expire(leadershipKey, 30);
const leaseInterval = setInterval(async () => {
if (abortController.signal.aborted) {
clearInterval(leaseInterval);
await redis.del(leadershipKey);
LeadershipMetrics.leadership.inc({
event: LEADERSHIP_EVENTS.LEADER_END,
});
return;
}
await redis.expire(leadershipKey, 30);
LeadershipMetrics.leadership.inc({
event: LEADERSHIP_EVENTS.LEADER_EXTEND,
});
}, 5000);
// # Subscribe on new_message_sub from controller
await onLeader(instanceId);
};
export const following = async (
abortController: AbortController,
logger: pino.Logger,
redis: IORedis,
instanceId: string,
onLeader: (instanceId: string) => Promise<void>
) => {
while (!abortController.signal.aborted) {
await Sleep(5000);
LeadershipMetrics.leadership.inc({
event: LEADERSHIP_EVENTS.FOLLOWER_TICK,
});
const leader = await redis.get(leadershipKey);
if (!leader) {
await election(abortController, logger, redis, instanceId, onLeader);
break;
}
}
};
export const election = async (
abortController: AbortController,
logger: pino.Logger,
redis: IORedis,
instanceId: string,
onLeader: (instanceId: string) => Promise<void>
) => {
LeadershipMetrics.leadership.inc({
event: LEADERSHIP_EVENTS.ELECTION,
});
const isLeader = await redis.setnx(leadershipKey, instanceId);
if (isLeader === 1) {
await leadership(abortController, redis, instanceId, onLeader);
LeadershipMetrics.leadership.inc({
event: LEADERSHIP_EVENTS.LEADER_CHOSEN,
});
} else {
following(abortController, logger, redis, instanceId, onLeader).catch(
(err) => {
logger.error({ err: err as Error }, "Error while following");
}
);
}
};
export const initLeadership = async (
abortController: AbortController,
logger: pino.Logger,
redis: IORedis,
instanceId: string,
onLeader: (instanceId: string) => Promise<void>
) => {
const leader = await redis.get(leadershipKey);
logger.debug({ leader, instanceId }, "Leader and instance id");
if (!leader) {
await election(abortController, logger, redis, instanceId, onLeader);
} else if (leader === instanceId) {
LeadershipMetrics.leadership.inc({
event: LEADERSHIP_EVENTS.SAME_LEADER,
});
await leadership(abortController, redis, instanceId, onLeader);
} else {
following(abortController, logger, redis, instanceId, onLeader).catch(
(err) => {
logger.error({ err: err as Error }, "Error while following");
}
);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment