Skip to content

Instantly share code, notes, and snippets.

@fabianeichinger
Last active July 30, 2023 14:42
Show Gist options
  • Save fabianeichinger/994be334ac91e63f5dd67c8d56734e07 to your computer and use it in GitHub Desktop.
Save fabianeichinger/994be334ac91e63f5dd67c8d56734e07 to your computer and use it in GitHub Desktop.
Redis Tag Cache
import { WatchError, createClient } from 'redis';
import { readFileSync } from 'fs';
const SET_SCRIPT = readFileSync(`SET.lua`, 'utf-8')
const VACUUM_SCRIPT = readFileSync(`VACUUM.lua`, 'utf-8');
/**
* @param {string} key
*/
function valueKey(key) {
return "value:" + key
}
/**
* @param {string} tag
*/
function tagSetKey(tag) {
return "tag:" + tag
}
/**
* @param {string} tag
*/
function tagItsKey(tag) {
return "tagits:" + tag
}
function tagVacuumKey(tag) {
return "vacuum:" + tag;
}
function timeSecs(time) {
return Math.floor(time.getTime() / 1000);
}
class C {
static sh = new WeakMap()
/**
*
* @param {import('redis').RedisClientType<?, ?, ?>} client
* @returns {Promise<C>}
*/
static async create(client) {
return new C(client).init()
}
/**
*
* @param {import('redis').RedisClientType<?, ?, ?>} client
*/
constructor(client) {
if (!client.isOpen) {
throw new Error('Redis not open');
}
this.client = client;
this.t = 0;
}
async init() {
const t = await this.client.time();
this.t = timeSecs(t);
if (C.sh.has(this.client)) {
this.sh = C.sh.get(this.client);
} else {
this.sh = Promise.all([
this.client.scriptLoad(SET_SCRIPT),
this.client.scriptLoad(VACUUM_SCRIPT)
]).then(([set, vacuum]) => ({
set, vacuum
}));
C.sh.set(this.client, this.sh)
}
return this;
}
/**
* @param {string} key
*/
async get(key) {
this.client.get(valueKey(key));
}
/**
* @param {string} key
* @param {string} value
* @param {Set<string>} tags
* @param {number} ex
* @param {boolean} vac
*/
async set(key, value, tags, ex, vac) {
const tagsArr = [...tags]
const rk = [valueKey(key), ...tagsArr.map(tagSetKey), ...tagsArr.map(tagItsKey)]
const exat = this.t + ex;
const ret = await this.client.evalSha((await this.sh).set, {
keys: rk,
arguments: [value, String(this.t), String(exat)]
});
if (vac && ret) {
for (const tag of tags) {
vacuum(this.client, tag, this.t);
}
}
}
}
let vacs = 0
/**
* @param {import('redis').RedisClientType<?, ?, ?>} client
* @param {string} tag
* @param {number} t
*/
export async function vacuum(client, tag, t) {
vacs++
const tsk = tagSetKey(tag);
const tik = tagItsKey(tag);
const tvk = tagVacuumKey(tag);
const vl = Math.random()
let fr = true
while(await client.evalSha((await C.sh.get(client)).vacuum, {
keys: [tsk, tik, tvk],
arguments: [String(t), String(vl), fr ? 'yes' : 'no']
})) {
/*console.count('vacuum ' + tag)*/
fr = false
}
vacs--
}
setInterval(() => console.log('vacs: ' + vacs), 1000);
/**
* @param {import("redis").RedisClientType<any, any, any>} client
* @param {Set<string>} tags
*/
async function invalidate(client, tags) {
const t = String(timeSecs(await client.time()));
await client.multi()
.del([...tags].map(tag => tagVacuumKey(tag)))
.mSet(Object.fromEntries([...tags].map(tag => ([tagItsKey(tag), t]))))
.exec();
let c = 0
await Promise.all([...tags].map(async (tag) => {
await client.executeIsolated(async (client) => {
const tsk = tagSetKey(tag);
const tik = tagItsKey(tag);
client.watch([tsk, tik]);
let tvks, cursor = 0, tt
do {
[{cursor, members: tvks}, tt] = await Promise.all([
client.sScan(tsk, cursor, { COUNT: 100 }),
client.get(tik)
]);
// console.count('tvks ' + tsk)
// console.log(tvks.length)
// console.log(new Set(tvks).size)
// console.log(tvks[0])
if (tvks.length > 0) {
client.del(tvks).then(cc => c += cc);
}
if (t !== tt) {
console.warn('different inval started', tag, t, tt)
return
}
} while (cursor !== 0)
try {
await client.multi().del(tsk).exec();
} catch (e) {
if (e instanceof WatchError) {
console.warn('this is fine')
}
}
})
}))
return c
}
export class Compat extends C {
/**
* @param {import("redis").RedisClientType<any, any, any>} client
*/
static async create(client) {
return new Compat(client).init()
}
/**
* @param {string[]} keys
*/
async get(...keys) {
if (keys.length === 1) {
const res = await super.get(keys[0]);
return JSON.parse(res);
}
return Promise.all(keys.map(k => {
const res = super.get(k);
return JSON.parse(res);
}))
}
/**
* @param {string} key
* @param {string} data
* @param {string[]} tags
*/
async set(key, data, tags, options = {vacuum: true}) {
return super.set(key, JSON.stringify(data), new Set(tags), Math.floor(options.timeout / 1000), options.vacuum);
}
/**
* @param {string[]} tags
*/
async invalidate(...tags) {
return invalidate(this.client, new Set(tags));
}
};
-- KEYS[nk = 1+2m]: vk, tsk_1, ..., tsk_m, tik_1, ..., tik_m
-- ARGV[]: v, t, exat
-- redis.log(redis.LOG_WARNING, x)
local nk = #KEYS
local m = math.floor(nk / 2)
local v = ARGV[1]
local t = tonumber(ARGV[2])
local exat = tonumber(ARGV[3])
local tis = redis.call('mget', unpack(KEYS,2+m,nk))
for i=1,m do
local ti = tonumber(tis[i])
-- Check if any tag was invalidated while processing
if ti and ti >= t then
return false
end
end
for i=2,1+m do
redis.call('sadd', KEYS[i], KEYS[1])
if redis.call('expireat', KEYS[i], exat, 'gt') ~= 1 then
redis.call('expireat', KEYS[i], exat, 'nx')
end
end
redis.call('set', KEYS[1], v, 'exat', exat)
return true
-- KEYS: tsk, tik, tvk
-- ARGS: t, vl, fr
-- redis.log(redis.LOG_WARNING, x)
local nk = table.getn(KEYS)
local m = math.floor(nk / 2)
local t = tonumber(ARGV[1])
local vl = ARGV[2]
local fr = ARGV[3] == 'yes'
-- local before = redis.call('get', KEYS[3])
local lock
if fr then
lock = redis.call('set', KEYS[3], vl, 'get', 'nx', 'ex', 1)
else
lock = redis.call('get', KEYS[3])
end
-- local after = redis.call('get', KEYS[3])
-- redis.log(redis.LOG_WARNING, vl.." "..tostring(before).." "..tostring(lock).." "..tostring(after))
if lock then
if lock == vl then
redis.call('expire', KEYS[3], 1)
-- redis.log(redis.LOG_WARNING, "cont"..KEYS[1])
else
-- redis.log(redis.LOG_WARNING, "stop"..KEYS[1])
return false
end
elseif not fr then
return false
end
if not fr then
redis.log(redis.LOG_WARNING, "not fr")
end
local ti = tonumber(redis.call('get', KEYS[2]))
if ti and ti >= t then
return false
end
local i = 0
local delcnt
repeat
i = i+1
local membs = redis.call('srandmember', KEYS[1], -20)
for j=#membs,1,-1 do
if redis.call('exists', membs[j]) == 1 then
table.remove(membs, j)
end
end
if #membs > 0 then
-- rems = rems + #membs
delcnt = redis.call('srem', KEYS[1], unpack(membs))
else
delcnt = 0
end
until delcnt < 5 or i == 5
redis.log(redis.LOG_WARNING, delcnt)
return delcnt >= 5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment