Skip to content

Instantly share code, notes, and snippets.

@Romakita
Last active August 18, 2022 13:45
Show Gist options
  • Save Romakita/432b1a8afaa726b41d0baf2456b205aa to your computer and use it in GitHub Desktop.
Save Romakita/432b1a8afaa726b41d0baf2456b205aa to your computer and use it in GitHub Desktop.
Ts.ED - Redis Cache + Cluster + ioredis 5 + OIDC Redis
import { getRedisConfiguration } from "./getRedisConfiguration";
describe("getRedisConfiguration", () => {
afterEach(() => {
delete process.env.REDIS_NODES;
delete process.env.REDIS_OPTS;
delete process.env.REDIS_URL;
delete process.env.REDIS_HOST;
delete process.env.REDIS_PORT;
delete process.env.REDIS_PASSWORD;
delete process.env.REDIS_INDEX;
});
describe("nodes", () => {
it("should load configuration with options", () => {
process.env.REDIS_NODES = JSON.stringify([]);
process.env.REDIS_OPTS = JSON.stringify({
scaleReads: "any",
maxRedirections: 100,
retryDelayOnTryAgain: 100,
retryDelayOnFailover: 100,
retryDelayOnClusterDown: 100,
slotsRefreshTimeout: 100,
slotsRefreshInterval: 100,
enableOfflineQueue: false,
enableReadyCheck: false,
noDelay: false,
connectTimeout: 5000,
autoResendUnfulfilledCommands: false,
maxRetriesPerRequest: 10,
enableAutoPipelining: false
});
const config = getRedisConfiguration();
expect(config).toEqual({
clusterConfig: {
nodes: [],
options: {
enableOfflineQueue: false,
enableReadyCheck: false,
maxRedirections: 100,
redisOptions: {
autoPipeliningIgnoredCommands: ["scan"],
autoResendUnfulfilledCommands: false,
connectTimeout: 5000,
enableAutoPipelining: false,
maxRetriesPerRequest: 10,
noDelay: false
},
retryDelayOnClusterDown: 100,
retryDelayOnFailover: 100,
retryDelayOnTryAgain: 100,
scaleReads: "any",
slotsRefreshInterval: 100,
slotsRefreshTimeout: 100
}
}
});
});
it("should load configuration with default values", () => {
process.env.REDIS_NODES = JSON.stringify([]);
delete process.env.REDIS_OPTS;
const config = getRedisConfiguration();
expect(config).toEqual({
clusterConfig: {
nodes: [],
options: {
enableOfflineQueue: true,
enableReadyCheck: true,
maxRedirections: 16,
redisOptions: {
autoPipeliningIgnoredCommands: ["scan"],
autoResendUnfulfilledCommands: true,
connectTimeout: 15000,
enableAutoPipelining: true,
maxRetriesPerRequest: 5,
noDelay: true
},
retryDelayOnClusterDown: 1000,
retryDelayOnFailover: 200,
retryDelayOnTryAgain: 100,
scaleReads: "all",
slotsRefreshInterval: 20000,
slotsRefreshTimeout: 15000
}
}
});
});
});
describe("url", () => {
it("should load configuration from given url", () => {
process.env.REDIS_URL = "redis://user:pwd@localhost:8888";
process.env.REDIS_DB_INDEX = "1";
expect(getRedisConfiguration()).toEqual({
autoPipeliningIgnoredCommands: ["scan"],
autoResendUnfulfilledCommands: true,
connectTimeout: 15000,
db: 1,
enableAutoPipelining: true,
host: "localhost",
maxRetriesPerRequest: 5,
noDelay: true,
password: "pwd",
port: 8888,
tls: undefined,
username: "user"
});
});
it("should load configuration from given url (tls)", () => {
process.env.REDIS_URL = "rediss://user:pwd@localhost:8888";
process.env.REDIS_DB_INDEX = "1";
expect(getRedisConfiguration()).toEqual({
autoPipeliningIgnoredCommands: ["scan"],
autoResendUnfulfilledCommands: true,
connectTimeout: 15000,
db: 1,
enableAutoPipelining: true,
host: "localhost",
maxRetriesPerRequest: 5,
noDelay: true,
password: "pwd",
port: 8888,
tls: {
rejectUnauthorized: false
},
username: "user"
});
});
});
describe("default", () => {
it("should load configuration from given url", () => {
expect(getRedisConfiguration()).toEqual({
autoPipeliningIgnoredCommands: ["scan"],
autoResendUnfulfilledCommands: true,
connectTimeout: 15000,
db: 1,
enableAutoPipelining: true,
host: "localhost",
maxRetriesPerRequest: 5,
noDelay: true,
port: 6379
});
});
});
});
import { getValue } from "@tsed/core";
import type { ClusterOptions, RedisOptions } from "ioredis";
import type { ClusterConfiguration, RedisConfiguration } from "./RedisConnection";
function redisOptions(opts: any = {}): RedisOptions {
return {
noDelay: getValue(opts, "noDelay", true),
connectTimeout: getValue(opts, "connectTimeout", 15000),
autoResendUnfulfilledCommands: getValue(opts, "autoResendUnfulfilledCommands", true),
maxRetriesPerRequest: getValue(opts, "maxRetriesPerRequest", 5),
enableAutoPipelining: getValue(opts, "enableAutoPipelining", true),
autoPipeliningIgnoredCommands: ["scan"]
};
}
function getClusterConfig(_nodes: string, _opts: string | undefined): ClusterConfiguration {
const nodes = JSON.parse(_nodes);
const opts = !!_opts ? JSON.parse(_opts) : {};
const options: ClusterOptions = {
scaleReads: getValue(opts, "scaleReads", "all"),
maxRedirections: getValue(opts, "maxRedirections", 16),
retryDelayOnTryAgain: getValue(opts, "retryDelayOnTryAgain", 100),
retryDelayOnFailover: getValue(opts, "retryDelayOnFailover", 200),
retryDelayOnClusterDown: getValue(opts, "retryDelayOnClusterDown", 1000),
slotsRefreshTimeout: getValue(opts, "slotsRefreshTimeout", 15000),
slotsRefreshInterval: getValue(opts, "slotsRefreshInterval", 20000),
enableOfflineQueue: getValue(opts, "enableOfflineQueue", true),
enableReadyCheck: getValue(opts, "enableReadyCheck", true),
redisOptions: redisOptions(opts)
};
return {
nodes,
options
};
}
function getDbIndex() {
return process.env.REDIS_DB_INDEX ? +process.env.REDIS_DB_INDEX : 0;
}
export function getRedisConfiguration(): RedisConfiguration {
if (process.env.REDIS_NODES) {
return {
clusterConfig: getClusterConfig(process.env.REDIS_NODES, process.env.REDIS_OPTS)
};
}
if (process.env.REDIS_URL) {
const url = new URL(process.env.REDIS_URL);
return {
host: url.hostname,
password: url.password,
port: url.port ? Number(url.port) : 6379,
username: url.username,
tls: url.protocol === "rediss:" ? { rejectUnauthorized: false } : undefined,
db: getDbIndex(),
...redisOptions(process.env.REDIS_OPTS)
};
}
return {
host: getValue(process.env, "REDIS_HOST", "localhost"),
port: Number(getValue(process.env, "REDIS_PORT", 6379)),
password: process.env.REDIS_PASSWORD,
db: getDbIndex(),
...redisOptions(process.env.REDIS_OPTS)
};
}
import { Adapters } from "@tsed/adapters";
import { LocalsContainer, PlatformTest } from "@tsed/common";
import Redis from "ioredis";
// @ts-ignore
import IORedisMock from "ioredis-mock";
import moment from "moment";
import { REDIS_CONNECTION } from "../redis/RedisConnection";
import { OIDCRedisAdapter } from "./OIDCRedisAdapter";
async function createAdapterFixture(collectionName: string) {
const locals = new LocalsContainer();
// @ts-ignore
const redis: Redis = new IORedisMock();
locals.set(REDIS_CONNECTION, redis);
const adapter = PlatformTest.get<Adapters>(Adapters).invokeAdapter({
collectionName,
model: Object,
adapter: OIDCRedisAdapter,
locals
}) as OIDCRedisAdapter<any>;
await redis.flushall();
return { adapter, redis };
}
async function createInitialDBFixture() {
const { adapter } = await createAdapterFixture("AccessToken");
const payload = {
grantId: "grantId",
userCode: "userCode",
uid: "uid",
_id: "id"
};
await adapter.upsert("id", payload);
return adapter;
}
describe("OIDCAdapter", () => {
beforeEach(() => PlatformTest.create({}));
afterEach(() => PlatformTest.reset());
describe("onInsert()", () => {
it("should test if the table is grantable and add index for grantId and the current payload", async () => {
const { adapter, redis } = await createAdapterFixture("AccessToken");
const payload = {
grantId: "grantId",
userCode: "userCode",
uid: "uid",
_id: "id"
};
await adapter.upsert("id", payload);
const keys = await redis.keys("$oidc:*");
expect(await redis.get("$oidc:grant:grantId")).toEqual(["AccessToken:id"]);
expect(keys).toEqual(["$oidc:grant:grantId", "$oidc:userCode:userCode", "$oidc:uid:uid"]);
});
it("should set the expiration ttl", async () => {
const { adapter, redis } = await createAdapterFixture("AccessToken");
const payload = {
grantId: "grantId",
userCode: "userCode",
uid: "uid",
_id: "id"
};
await adapter.upsert("id", payload, moment().add(2, "days").toDate());
const ttl = await redis.ttl("$oidc:grant:grantId");
expect(ttl).toBeGreaterThan(100000);
});
it("should create grant indexes if the payload type isn't grantable", async () => {
const { adapter, redis } = await createAdapterFixture("Other");
const payload = {
grantId: "grantId",
userCode: "userCode",
uid: "uid",
_id: "id"
};
await adapter.upsert("id", payload);
const keys = await redis.keys("$oidc:*");
expect(keys).toEqual(["$oidc:userCode:userCode", "$oidc:uid:uid"]);
});
});
describe("findByUid()", () => {
it("should retrieve the payload by his uid", async () => {
const adapter = await createInitialDBFixture();
const result = await adapter.findByUid("uid");
expect(result).toEqual({
_id: "id",
grantId: "grantId",
uid: "uid",
userCode: "userCode"
});
});
it("should not retrieve the payload by his uid", async () => {
const adapter = await createInitialDBFixture();
const result = await adapter.findByUid("wrong");
expect(result).toEqual(null);
});
});
describe("findByUserCode()", () => {
it("should retrieve the payload by his userCode", async () => {
const adapter = await createInitialDBFixture();
const result = await adapter.findByUserCode("userCode");
expect(result).toEqual({
_id: "id",
grantId: "grantId",
uid: "uid",
userCode: "userCode"
});
});
it("should not retrieve the payload by his userCode", async () => {
const adapter = await createInitialDBFixture();
const result = await adapter.findByUserCode("wrong");
expect(result).toEqual(null);
});
});
describe("destroy()", () => {
it("should retrieve the payload by his userCode", async () => {
const adapter = await createInitialDBFixture();
await adapter.destroy("id");
const result = await adapter.findById("id");
expect(result).toEqual(undefined);
});
});
describe("revokeGrantId()", () => {
it("should retrieve the payload by his userCode", async () => {
const adapter = await createInitialDBFixture();
const keys = await adapter.db.lrange("$oidc:grant:grantId", 0, -1);
expect(keys).toEqual(["AccessToken:id"]);
expect(await adapter.db.get("AccessToken:id")).toEqual('{"grantId":"grantId","userCode":"userCode","uid":"uid","_id":"id"}');
expect(await adapter.db.get("$oidc:grant:grantId")).toEqual(["AccessToken:id"]);
await adapter.revokeByGrantId("grantId");
expect(await adapter.db.get("AccessToken:id")).toEqual(null);
expect(await adapter.db.get("$oidc:grant:grantId")).toEqual(null);
});
});
describe("consumes()", () => {
it("should set a consume flag in redis", async () => {
const { adapter } = await createAdapterFixture("AuthorizationCode");
await adapter.consume("codeId");
const result = await adapter.db.hget("AuthorizationCode:codeId", "consumed");
expect(!isNaN(Number(result))).toBe(true);
});
});
});
import { AdapterConstructorOptions, AdapterModel } from "@tsed/adapters";
import { Configuration } from "@tsed/di";
import { ChainableCommander } from "ioredis";
import { RedisAdapter } from "../redis/RedisAdapter";
const GRANTABLE = new Set(["AccessToken", "AuthorizationCode", "RefreshToken", "DeviceCode", "BackchannelAuthenticationRequest"]);
const CONSUMABLE = new Set(["AuthorizationCode", "RefreshToken", "DeviceCode", "BackchannelAuthenticationRequest"]);
function grantKeyFor(id: string) {
return `$oidc:grant:${id}`;
}
function userCodeKeyFor(userCode: string) {
return `$oidc:userCode:${userCode}`;
}
function uidKeyFor(uid: string) {
return `$oidc:uid:${uid}`;
}
export class OIDCRedisAdapter<T extends AdapterModel> extends RedisAdapter<T> {
protected isGrantable: boolean;
constructor(options: AdapterConstructorOptions, configuration: Configuration) {
super(options, configuration);
this.useHash = CONSUMABLE.has(this.collectionName);
this.isGrantable = GRANTABLE.has(this.collectionName);
this.hooks.on("insert", this.onInsert.bind(this));
}
async onInsert(multi: ChainableCommander, payload: T, expiresIn: number) {
const id = payload._id;
const key = this.key(id);
if (this.isGrantable && payload.grantId) {
const grantKey = grantKeyFor(payload.grantId);
multi.rpush(grantKey, key);
// if you're seeing grant key lists growing out of acceptable proportions consider using LTRIM
// here to trim the list to an appropriate length
const ttl = await this.db.ttl(grantKey);
if (expiresIn && expiresIn > ttl) {
multi.expire(grantKey, expiresIn);
}
}
if (payload.userCode) {
const userCodeKey = userCodeKeyFor(payload.userCode);
multi.set(userCodeKey, id);
expiresIn && multi.expire(userCodeKey, expiresIn);
}
if (payload.uid) {
const uidKey = uidKeyFor(payload.uid);
multi.set(uidKey, id);
expiresIn && multi.expire(uidKey, expiresIn);
}
return multi;
}
async findByUid(uid: string) {
const id = await this.db.get(uidKeyFor(uid));
return id && this.findById(id);
}
async findByUserCode(userCode: string) {
const id = await this.db.get(userCodeKeyFor(userCode));
return id && this.findById(id);
}
async destroy(id: string) {
const key = this.key(id);
await this.db.del(key);
}
async revokeByGrantId(grantId: string) {
// eslint-disable-line class-methods-use-this
const multi = this.db.multi();
const key = grantKeyFor(grantId);
const tokens = await this.db.lrange(key, 0, -1);
tokens.forEach((token) => multi.del(token));
multi.del(grantKeyFor(grantId));
await multi.exec();
}
async consume(id: string) {
await this.db.hset(this.key(id), "consumed", Math.floor(Date.now() / 1000));
}
}
import { Adapters, Indexed } from "@tsed/adapters";
import { LocalsContainer, PlatformTest } from "@tsed/common";
import { deserialize } from "@tsed/json-mapper";
import { Property } from "@tsed/schema";
import type Redis from "ioredis";
// @ts-ignore
import IORedisMock from "ioredis-mock";
import { RedisAdapter } from "./RedisAdapter";
import { REDIS_CONNECTION } from "./RedisConnection";
class Client {
@Property()
_id: string;
@Indexed()
name: string;
}
async function createAdapterFixture() {
const locals = new LocalsContainer();
// @ts-ignore
const redis: Redis = new IORedisMock();
locals.set(REDIS_CONNECTION, redis);
const adapter = PlatformTest.get<Adapters>(Adapters).invokeAdapter<Client>({
collectionName: "clients",
model: Client,
adapter: RedisAdapter,
locals
}) as RedisAdapter<Client>;
await redis.flushall();
return { adapter };
}
describe("RedisAdapter", () => {
beforeEach(() => PlatformTest.create({}));
afterEach(() => PlatformTest.reset());
describe("create()", () => {
it("should create a new instance", async () => {
const { adapter } = await createAdapterFixture();
const base = {
name: "name"
};
const client = await adapter.create(base, new Date(Date.now() + 3000));
expect(client).toBeInstanceOf(Client);
expect(typeof client._id).toBe("string");
expect(client.name).toBe(base.name);
const keys = await adapter.db.keys("*");
expect(keys).toContain("clients:" + client._id);
expect(keys).toContain(`$idx:clients:${client._id}:name(${base.name})`);
});
});
describe("upsert()", () => {
it("should upsert a new instance", async () => {
const { adapter } = await createAdapterFixture();
const base = deserialize<Client>(
{
name: "name"
},
{ type: Client }
);
const id = "uuid";
const client = await adapter.upsert(id, base);
const client2 = await adapter.upsert(id, base);
expect(client).toBeInstanceOf(Client);
expect(typeof client._id).toBe("string");
expect(client._id).toBe(client2._id);
expect(client.name).toBe(base.name);
const keys = await adapter.db.keys("*");
expect(keys).toContain("clients:" + client._id);
expect(keys).toContain(`$idx:clients:${client._id}:name(${base.name})`);
});
});
describe("findById()", () => {
it("should create a new instance", async () => {
const { adapter } = await createAdapterFixture();
const base = {
name: "name"
};
const client = await adapter.create(base);
const result = await adapter.findById(client._id);
expect(result).toBeInstanceOf(Client);
expect(result?._id).toBe(client._id);
expect(result?.name).toBe(base.name);
});
});
describe("findOne()", () => {
it("should find instance", async () => {
const { adapter } = await createAdapterFixture();
const base = {
name: "name"
};
const client = await adapter.create(base);
const result = await adapter.findOne({
name: base.name
});
expect(result).toBeInstanceOf(Client);
expect(result?._id).toBe(client._id);
expect(result?.name).toBe(base.name);
});
it("should find instance by id", async () => {
const { adapter } = await createAdapterFixture();
const base = {
name: "name"
};
const client = await adapter.create(base);
const result = await adapter.findOne({
_id: client._id,
name: base.name
});
expect(result).toBeInstanceOf(Client);
expect(result?._id).toBe(client._id);
expect(result?.name).toBe(base.name);
});
it("should not find data", async () => {
const { adapter } = await createAdapterFixture();
const base = {
name: "name",
otherProp: "name"
};
await adapter.create(base);
const result = await adapter.findOne({
name: base.name,
otherProp: base.otherProp
});
expect(result).toBeUndefined();
});
});
describe("findAll()", () => {
it("should find all data (one prop)", async () => {
const { adapter } = await createAdapterFixture();
const base = {
name: "name"
};
const client = await adapter.create(base);
const result = await adapter.findAll({
name: base.name
});
expect(result[0]).toBeInstanceOf(Client);
expect(result[0]?._id).toBe(client._id);
expect(result[0]?.name).toBe(base.name);
});
it("should find all by id (one prop)", async () => {
const { adapter } = await createAdapterFixture();
const base = {
name: "name"
};
const client = await adapter.create(base);
const result = await adapter.findAll({
_id: client._id
});
expect(result[0]).toBeInstanceOf(Client);
expect(result[0]?._id).toBe(client._id);
expect(result[0]?.name).toBe(base.name);
});
it("should find all items", async () => {
const { adapter } = await createAdapterFixture();
const base = {
name: "name"
};
const client = await adapter.create(base);
const result = await adapter.findAll({});
expect(result[0]).toBeInstanceOf(Client);
expect(result[0]?._id).toBe(client._id);
expect(result[0]?.name).toBe(base.name);
});
it("should not find data when predicate has an unknown prop", async () => {
const { adapter } = await createAdapterFixture();
const base = {
name: "name",
otherProp: "name"
};
await adapter.create(base);
const result = await adapter.findAll({
name: base.name,
otherProp: base.otherProp
});
expect(result).toEqual([]);
});
});
describe("updateOne()", () => {
it("should update one item", async () => {
const { adapter } = await createAdapterFixture();
const base = deserialize<Client>(
{
name: "name"
},
{ type: Client }
);
const client = await adapter.create(base);
const result = await adapter.updateOne(
{
name: base.name
},
base
);
expect(client).toBeInstanceOf(Client);
expect(typeof client._id).toBe("string");
expect(client._id).toBe(result?._id);
expect(client.name).toBe(base.name);
});
it("should not update item", async () => {
const { adapter } = await createAdapterFixture();
const base = deserialize<Client>(
{
name: "name"
},
{ type: Client }
);
const client = await adapter.create(base);
const result = await adapter.updateOne(
{
name: `${base.name}2`
},
base
);
expect(result).toBeUndefined();
expect(client.name).toBe(base.name);
});
});
describe("deleteOne()", () => {
it("should remove one item", async () => {
const { adapter } = await createAdapterFixture();
const base = deserialize<Client>(
{
name: "name"
},
{ type: Client }
);
const client = await adapter.create(base);
const result = await adapter.deleteOne({
name: base.name
});
expect(client).toBeInstanceOf(Client);
expect(typeof client._id).toBe("string");
expect(client._id).toBe(result?._id);
expect(client.name).toBe(base.name);
});
it("should not remove item", async () => {
const { adapter } = await createAdapterFixture();
const base = deserialize<Client>(
{
name: "name"
},
{ type: Client }
);
const client = await adapter.create(base);
const result = await adapter.deleteOne({
name: `${base.name}2`
});
expect(result).toBeUndefined();
expect(client.name).toBe(base.name);
});
});
describe("deleteById()", () => {
it("should remove one item", async () => {
const { adapter } = await createAdapterFixture();
const base = deserialize<Client>(
{
name: "name"
},
{ type: Client }
);
const client = await adapter.create(base);
const result = await adapter.deleteById(client._id);
expect(client).toBeInstanceOf(Client);
expect(typeof client._id).toBe("string");
expect(client._id).toBe(result?._id);
expect(client.name).toBe(base.name);
});
});
describe("deleteMany()", () => {
it("should remove items", async () => {
const { adapter } = await createAdapterFixture();
const base = deserialize<Client>(
{
name: "name"
},
{ type: Client }
);
const client = await adapter.create(base);
const result = await adapter.deleteMany({
name: base.name
});
expect(result[0]).toBeInstanceOf(Client);
expect(typeof result[0]._id).toBe("string");
expect(result[0]._id).toBe(client?._id);
expect(result[0].name).toBe(base.name);
});
});
});
import { Adapter, AdapterModel } from "@tsed/adapters";
import { Hooks } from "@tsed/core";
import { Inject } from "@tsed/di";
import { ChainableCommander } from "ioredis";
import { v4 as uuid } from "uuid";
import { REDIS_CONNECTION } from "./RedisConnection";
const getId = (key: string) => key.split(":")[2];
const flatKeys = (keys: [Error | null, string[]][] | null): string[] => {
return (keys || []).flatMap(([, result]) => result).filter(Boolean);
};
export class RedisAdapter<T extends AdapterModel> extends Adapter<T> {
@Inject(REDIS_CONNECTION)
readonly db: REDIS_CONNECTION;
protected useHash: boolean = false;
protected hooks = new Hooks();
key(id: string) {
return `${this.collectionName}:${id}`;
}
public async create(payload: Partial<T>, expiresAt?: Date): Promise<T> {
payload._id = uuid();
return this.insert(payload, expiresAt);
}
public async upsert(id: string, payload: T, expiresAt?: Date): Promise<T> {
const item = await this.findById(id);
if (!item) {
payload = { ...payload, _id: id };
return this.insert(payload, expiresAt);
}
return (await this.update(id, payload, expiresAt)) as T;
}
public async update(id: string, payload: T, expiresAt?: Date): Promise<T | undefined> {
return this.updateOne({ _id: id }, payload, expiresAt);
}
public async updateOne(predicate: Partial<T & any>, payload: T, expiresAt?: Date): Promise<T | undefined> {
const item = await this.findOne(predicate);
if (!item) {
return undefined;
}
return this.insert(
{
...this.updateInstance(item, payload),
_id: item._id
},
expiresAt
);
}
async findOne(predicate: Partial<T & any>): Promise<T | undefined> {
const { _id, ...props } = predicate;
if (_id) {
return this.findById(_id);
}
const keys = Object.keys(props);
let foundKeys = await this.findKeys(props);
if (foundKeys.length < keys.length) {
return undefined;
}
const getId = (key: string) => key.split(":")[2];
const id = getId(foundKeys[0]);
foundKeys = foundKeys.filter((key) => id === getId(key));
return keys.length === foundKeys.length ? this.findById(id) : undefined;
}
async findById(_id: string): Promise<T | undefined> {
const item = await this.db.get(this.key(_id));
if (!item) {
return undefined;
}
return this.deserialize(JSON.parse(item));
}
public async findAll(predicate: Partial<T & any> = {}): Promise<T[]> {
const { _id, ...props } = predicate;
if (_id) {
const item = await this.findById(_id);
return item ? [item] : [];
}
const keys = Object.keys(props);
if (keys.length === 0) {
return this.getAll();
}
return this.findAllBy(props);
}
public async deleteOne(predicate: Partial<T & any>): Promise<T | undefined> {
const item = await this.findOne(predicate);
if (item) {
const id = this.key(item._id);
const indexIds = await this.getAllIndex(item._id);
await this.db.del([id, ...indexIds]);
return this.deserialize(item);
}
}
public async deleteById(_id: string): Promise<T | undefined> {
return this.deleteOne({ _id } as any);
}
public async deleteMany(predicate: Partial<T>): Promise<T[]> {
const items = await this.findAll(predicate);
const pipeline = this.db.pipeline();
const ids: string[] = [];
const results = items.map((item) => {
ids.push(this.key(item._id));
this.getAllIndex(item._id, pipeline);
return this.deserialize(item);
});
const keys = await pipeline.exec();
this.db.del([...ids, ...flatKeys(keys as any)]);
return results;
}
protected async findKeys(props: any): Promise<string[]> {
const keys: any[] = Object.keys(props);
const pipeline = this.db.pipeline();
this.indexes
.filter(({ propertyKey }) => keys.includes(propertyKey))
.forEach(({ propertyKey }) => {
const value = props[propertyKey];
const patterns = this.getIndexedKey("*", propertyKey, value);
pipeline.keys(patterns);
});
const results = await pipeline.exec();
return flatKeys(results as any[]);
}
protected async insert(payload: Partial<T>, expiresAt?: Date) {
const id = (payload._id = payload._id || uuid());
const expiresIn = expiresAt ? expiresAt.getTime() - Date.now() : null;
await this.validate(payload as T);
const multi = this.db.multi();
const key = this.key(id);
const strPayload = JSON.stringify(this.serialize(payload));
this.useHash ? await multi.hmset(key, strPayload) : await multi.set(key, strPayload);
if (expiresIn) {
multi.expire(key, expiresIn);
}
this.indexes.forEach(({ propertyKey }) => {
const value = payload[propertyKey];
const indexedKey = this.getIndexedKey(id, propertyKey, value);
multi.set(indexedKey, id);
expiresIn && multi.expire(indexedKey, expiresIn);
});
await this.hooks.asyncAlter("insert", multi, [payload, expiresIn]);
await multi.exec();
return this.deserialize(payload);
}
protected getAllIndex(id: string): Promise<string[]>;
protected getAllIndex(id: string, pipeline: ChainableCommander): ChainableCommander;
protected getAllIndex(id: string, pipeline?: ChainableCommander) {
const key = ["$idx", this.key(id), "*"].join(":");
return (pipeline || this.db).keys(key);
}
protected getIndexedKey(id: string, propertyKey: string, value: any): string {
const key = this.key(id);
return ["$idx", key, `${propertyKey}(${value})`].map(String).join(":");
}
protected async getAll(): Promise<T[]> {
const keys = await this.db.keys(`${this.collectionName}:*`);
const pipeline = this.db.pipeline();
keys.forEach((key) => {
pipeline.get(key);
});
const result = await pipeline.exec();
return (result || []).map(([, data]: [any, string]) => this.deserialize(JSON.parse(data))).filter(Boolean);
}
protected async findAllBy(props: Partial<T & any>): Promise<T[]> {
const keys = Object.keys(props);
const foundKeys = await this.findKeys(props);
if (foundKeys.length < keys.length) {
return [];
}
const map = foundKeys.reduce((map: Map<string, number>, key) => {
const id = getId(key);
const value = map.get(id) || 0;
return map.set(id, value + 1);
}, new Map());
const promises = [...map.entries()].filter(([, num]) => num === keys.length).map(([id]) => this.findById(id));
const result = await Promise.all(promises);
return result.filter(Boolean) as T[];
}
}
import { PlatformTest } from "@tsed/common";
import Redis from "ioredis";
import { REDIS_CONNECTION } from "./RedisConnection";
jest.mock("ioredis", () => {
class Redis {
static Cluster = class {
connector: any = {};
constructor(public nodes: any, public options: any) {
this.connector.options = options;
}
async connect() {
return undefined;
}
async disconnect() {
return undefined;
}
};
connector: any = {};
constructor(public options: any) {
this.connector.options = options;
}
async connect() {
return undefined;
}
async disconnect() {
return undefined;
}
}
return Redis;
});
describe("RedisConnection", () => {
describe("Redis", () => {
beforeEach(() =>
PlatformTest.create({
redis: {
host: "localhost"
},
cache: {} as any
})
);
afterEach(() => PlatformTest.reset());
it("should create redis connection", () => {
const connection = PlatformTest.get<REDIS_CONNECTION>(REDIS_CONNECTION);
const cacheSettings = PlatformTest.injector.settings.get("cache");
expect((connection as any).options).toMatchObject({
host: "localhost",
lazyConnect: true,
reconnectOnError: expect.any(Function)
});
expect(cacheSettings.redisInstance).toEqual(connection);
});
});
describe("Cluster", () => {
beforeEach(() =>
PlatformTest.create({
redis: {
clusterConfig: {
nodes: [],
options: {
value: "value"
}
}
},
cache: {} as any
})
);
afterEach(() => PlatformTest.reset());
it("should create redis connection", () => {
const connection = PlatformTest.get<REDIS_CONNECTION>(REDIS_CONNECTION);
const cacheSettings = PlatformTest.injector.settings.get("cache");
expect((connection as any).options).toMatchObject({
clusterRetryStrategy: expect.any(Function),
value: "value",
lazyConnect: true,
redisOptions: {
reconnectOnError: expect.any(Function)
}
});
expect((connection as any).nodes).toEqual([]);
expect(cacheSettings.redisInstance).toEqual(connection);
(connection as any).options.clusterRetryStrategy();
(connection as any).options.redisOptions.reconnectOnError();
});
});
});
import { setValue } from "@tsed/core";
import { Configuration, registerProvider } from "@tsed/di";
import { Logger } from "@tsed/logger";
import Redis, { Cluster, ClusterOptions, RedisOptions } from "ioredis";
import { redisStore } from "./RedisStore";
export const REDIS_CONNECTION = Symbol("redis:connection");
export type REDIS_CONNECTION = Redis;
export interface ClusterConfiguration {
nodes: string[];
options: ClusterOptions;
}
export type RedisConfiguration =
| RedisOptions
| {
clusterConfig: ClusterConfiguration;
};
registerProvider({
provide: REDIS_CONNECTION,
deps: [Configuration, Logger],
async useAsyncFactory(configuration: Configuration, logger: Logger) {
const cacheSettings = configuration.get("cache");
const redisSettings = configuration.get<RedisConfiguration>("redis");
if (redisSettings) {
let connection: Cluster | Redis;
const reconnectOnError = (err: any) => {
logger.fatal({ event: "REDIS_ERROR", message: `Redis - Reconnect on error: ${(err && err.message) || err}` });
};
if ("clusterConfig" in redisSettings) {
setValue(redisSettings.clusterConfig, "options.clusterRetryStrategy", (times: number) => {
logger.fatal({ event: "REDIS_ERROR", message: `Redis is not responding - Retry count: ${times}` });
return 2000;
});
setValue(redisSettings.clusterConfig, "options.redisOptions.reconnectOnError", reconnectOnError);
connection = new Redis.Cluster(redisSettings.clusterConfig.nodes, {
...redisSettings.clusterConfig.options,
lazyConnect: true
});
} else {
connection = new Redis({
...redisSettings,
lazyConnect: true,
reconnectOnError
} as RedisOptions);
}
cacheSettings.redisInstance = connection;
cacheSettings.store = redisStore;
await connection.connect();
logger.info("Connected to redis database...");
return connection;
}
return {};
},
hooks: {
$onDestroy(connection: Redis) {
return connection.disconnect && connection.disconnect();
}
}
});
import { catchAsyncError, Hooks } from "@tsed/core";
import { caching } from "cache-manager";
import Redis from "ioredis";
import { RedisStore, redisStore } from "./RedisStore";
jest.mock("ioredis", () => {
return class Redis {
static Cluster = class {
hooks = new Hooks();
connector: any = {};
constructor(protected options: any = {}) {
this.connector.options = options;
setTimeout(() => {
this.hooks.emit("ready");
}, 10);
}
once(...args: any[]) {
(this.hooks.on as any)(...args);
return this;
}
disconnect() {
return undefined;
}
};
hooks = new Hooks();
cache = new Map();
connector: any = {};
constructor(protected options: any = {}) {
this.connector.options = options;
setTimeout(() => {
this.hooks.emit("ready");
}, 10);
}
once(...args: any[]) {
(this.hooks.on as any)(...args);
return this;
}
async del(key: string) {
const result = this.cache.delete(key);
return result ? 1 : 0;
}
async flushdb() {
this.cache.clear();
return "OK";
}
async setex(key: string, ttl: any, value: any) {
this.cache.set(key, { value, ttl });
return "OK";
}
async set(key: string, value: any) {
this.cache.set(key, { value, ttl: -1 });
return "OK";
}
async get(key: string) {
const cached = this.cache.get(key);
if (!cached) {
throw new Error("missing key");
}
return cached.value;
}
async ttl(key: string) {
const cached = this.cache.get(key);
return cached?.ttl || -1;
}
async end() {
return this;
}
keys() {
return [...this.cache.keys()];
}
disconnect() {
return undefined;
}
};
});
let redisCache: any;
let customRedisCache: any;
const config = {
host: "127.0.0.1",
port: 6379,
password: null,
db: 0,
ttl: 5
};
describe("RedisStore", () => {
beforeEach((done) => {
redisCache = caching({
store: redisStore,
host: config.host,
port: config.port,
password: config.password,
db: config.db,
ttl: config.ttl
});
customRedisCache = caching({
store: redisStore,
host: config.host,
port: config.port,
password: config.password,
db: config.db,
ttl: config.ttl,
isCacheableValue: (val) => {
if (val === undefined) {
// allow undefined
return true;
} else if (val === "FooBarString") {
// disallow FooBarString
return false;
}
return redisCache.store.isCacheableValue(val);
}
});
redisCache.store.getClient().once("ready", () => redisCache.reset(done));
});
afterEach(() => {
redisCache.store.getClient().disconnect({ reconnect: true });
});
describe("initialization", () => {
it("should create a store with password instead of auth_pass (auth_pass is deprecated for redis > 2.5)", async () => {
const redisPwdCache = caching({
store: redisStore,
host: config.host,
port: config.port,
password: config.password,
db: config.db,
ttl: config.ttl
});
expect((redisPwdCache.store as any).getClient().options.password).toEqual(config.password);
});
it("should create redis store", async () => {
const redisStore = new RedisStore({
host: config.host,
port: config.port,
db: config.db,
ttl: config.ttl
});
expect(redisStore.getClient()).toBeInstanceOf(Redis);
});
it("should create a store with an external redisInstance", () => {
const externalRedisInstanceCache = caching({
store: redisStore,
redisInstance: new Redis({
host: config.host,
port: config.port,
password: config.password,
db: config.db
} as any),
ttl: config.ttl
});
expect((externalRedisInstanceCache.store as any).getClient().options.password).toEqual(config.password);
});
it("should create a store clusterConfig", () => {
caching({
store: redisStore,
clusterConfig: {
nodes: [],
config: {}
},
ttl: config.ttl
});
});
});
describe("set", () => {
it("should return a promise", () => {
expect(redisCache.set("foo", "bar")).toBeInstanceOf(Promise);
});
it("should resolve promise on success", async () => {
const result = await redisCache.set("foo", "bar");
expect(result).toEqual("OK");
});
it("should reject promise on error", async () => {
const error = await catchAsyncError(() => redisCache.set("foo", null));
expect(!!error).toEqual(true);
});
it("should store a value without ttl", (done) => {
redisCache.set("foo", "bar", (err: any) => {
expect(err).toEqual(null);
done();
});
});
it("should store a value with a specific ttl", (done) => {
redisCache.set("foo", "bar", config.ttl, (err: any) => {
expect(err).toEqual(null);
done();
});
});
it("should store a value with a infinite ttl", (done) => {
redisCache.set("foo", "bar", { ttl: 0 }, (err: any) => {
expect(err).toEqual(null);
redisCache.ttl("foo", (err: any, ttl: number) => {
expect(err).toEqual(null);
expect(ttl).toEqual(-1);
done();
});
});
});
it("should not be able to store a null value (not cacheable)", (done) => {
redisCache.set("foo2", null, (err: any) => {
if (err) {
return done();
}
done(new Error("Null is not a valid value!"));
});
});
it("should store a value without callback", (done) => {
redisCache.set("foo", "baz");
redisCache.get("foo", (err: any, value: any) => {
expect(err).toEqual(null);
expect(value).toEqual("baz");
done();
});
});
it("should not store an invalid value", (done) => {
redisCache.set("foo1", undefined, (err: any) => {
try {
expect(err).not.toEqual(null);
expect(err.message).toEqual('"undefined" is not a cacheable value');
done();
} catch (e) {
done(e);
}
});
});
it("should store an undefined value if permitted by isCacheableValue", (done) => {
expect(customRedisCache.store.isCacheableValue(undefined)).toBe(true);
customRedisCache.set("foo3", undefined, (err: any) => {
try {
expect(err).toEqual(null);
customRedisCache.get("foo3", (err: any, data: any) => {
try {
expect(err).toEqual(null);
// redis stored undefined as 'undefined'
expect(data).toEqual("undefined");
done();
} catch (e) {
done(e);
}
});
} catch (e) {
done(e);
}
});
});
it("should not store a value disallowed by isCacheableValue", (done) => {
expect(customRedisCache.store.isCacheableValue("FooBarString")).toBe(false);
customRedisCache.set("foobar", "FooBarString", (err: any) => {
try {
expect(err).not.toEqual(null);
expect(err.message).toEqual('"FooBarString" is not a cacheable value');
done();
} catch (e) {
done(e);
}
});
});
it("should return an error if there is an error acquiring a connection", (done) => {
redisCache.store.getClient().end(true);
redisCache.set("foo", "bar", (err: any) => {
expect(err).not.toEqual(null);
done();
});
});
});
describe("get", () => {
it("should return value from callback", async () => {
await redisCache.set("foo3", "bar");
const result = await new Promise((resolve, reject) => {
redisCache.get("foo3", (err: any, value: any) => {
err ? reject(err) : resolve(value);
});
});
expect(result).toEqual("bar");
});
it("should reject error from callback", async () => {
const error = await catchAsyncError(() => {
return new Promise((resolve, reject) => {
redisCache.get("foo2", (err: any, value: any) => {
err ? reject(err) : resolve(value);
});
});
});
expect(!!error).toEqual(true);
});
it("should resolve promise on success", async () => {
await redisCache.set("foo", "bar");
const result = await redisCache.get("foo");
expect(result).toEqual("bar");
});
it("should reject promise on error", async () => {
const error = await catchAsyncError(() => redisCache.get("foo2"));
expect(!!error).toEqual(true);
});
it("should retrieve a value for a given key", async () => {
const value = "bar";
await redisCache.set("foo", value);
const result = await redisCache.get("foo");
expect(result).toEqual(value);
});
it("should retrieve a value for a given key if options provided", async () => {
const value = "bar";
await redisCache.set("foo", value);
const result = await redisCache.get("foo", {});
expect(result).toEqual(value);
});
});
describe("del", () => {
it("should delete a value for a given key", (done) => {
redisCache.set("foo", "bar", () => {
redisCache.del("foo", (err: any) => {
expect(err).toEqual(null);
done();
});
});
});
it("should delete a value for a given key without callback", (done) => {
redisCache.set("foo", "bar", () => {
redisCache.del("foo");
done();
});
});
it("should return an error if there is an error acquiring a connection", (done) => {
redisCache.store.getClient().end(true);
redisCache.del("foo", (err: any) => {
expect(err).not.toEqual(null);
done();
});
});
});
describe("reset", () => {
it("should flush underlying db", (done) => {
redisCache.reset((err: any) => {
expect(err).toEqual(null);
done();
});
});
it("should flush underlying db without callback", (done) => {
redisCache.reset();
done();
});
it("should return an error if there is an error acquiring a connection", (done) => {
redisCache.store.getClient().end(true);
redisCache.reset((err: any) => {
expect(err).not.toEqual(null);
done();
});
});
});
describe("ttl", () => {
it("should retrieve ttl for a given key", (done) => {
redisCache.set("foo", "bar", () => {
redisCache.ttl("foo", (err: any, ttl: number) => {
expect(err).toEqual(null);
expect(ttl).toEqual(config.ttl);
done();
});
});
});
it("should retrieve ttl for an invalid key", (done) => {
redisCache.ttl("invalidKey", (err: any, ttl: number) => {
expect(err).toEqual(null);
expect(ttl).not.toEqual(null);
done();
});
});
it("should return an error if there is an error acquiring a connection", (done) => {
redisCache.store.getClient().end(true);
redisCache.ttl("foo", (err: any) => {
expect(err).not.toEqual(null);
done();
});
});
});
describe("keys", () => {
it("should resolve promise on success", async () => {
await redisCache.set("foo", "bar");
const keys = await redisCache.keys("f*");
expect(keys).toEqual(["foo"]);
});
it("should return an array of keys for the given pattern", (done) => {
redisCache.set("foo", "bar", () => {
redisCache.keys("f*", (err: any, arrayOfKeys: any[]) => {
expect(err).toEqual(null);
expect(arrayOfKeys).not.toEqual(null);
expect(arrayOfKeys.indexOf("foo")).not.toEqual(-1);
done();
});
});
});
it("should return an array of keys without pattern", (done) => {
redisCache.set("foo", "bar", () => {
redisCache.keys((err: any, arrayOfKeys: any[]) => {
expect(err).toEqual(null);
expect(arrayOfKeys).not.toEqual(null);
expect(arrayOfKeys.indexOf("foo")).not.toEqual(-1);
done();
});
});
});
it("should return an error if there is an error acquiring a connection", (done) => {
redisCache.store.getClient().end(true);
redisCache.keys("foo", (err: any) => {
expect(err).not.toEqual(null);
done();
});
});
});
describe("isCacheableValue", () => {
it("should return true when the value is not undefined", () => {
expect(redisCache.store.isCacheableValue(0)).toBe(true);
expect(redisCache.store.isCacheableValue(100)).toBe(true);
expect(redisCache.store.isCacheableValue("")).toBe(true);
expect(redisCache.store.isCacheableValue("test")).toBe(true);
});
it("should return false when the value is undefined", () => {
expect(redisCache.store.isCacheableValue(undefined)).toBe(false);
});
it("should return false when the value is null", () => {
expect(redisCache.store.isCacheableValue(null)).toBe(false);
});
});
describe("overridable isCacheableValue function", () => {
let redisCache2: any;
beforeEach(() => {
redisCache2 = caching({
store: redisStore,
ttl: 60,
isCacheableValue: () => {
return "I was overridden" as any;
}
});
});
it("should return its return value instead of the built-in function", (done) => {
expect(redisCache2.store.isCacheableValue(0)).toEqual("I was overridden");
done();
});
});
});
import { Store } from "cache-manager";
import Redis, { Cluster } from "ioredis";
import type { Callback } from "ioredis/built/types";
async function handle<Response = any>(next: () => Promise<Response>, parse: boolean, cb?: Callback<Response | null>) {
try {
let result = await next();
if (parse) {
result = result && JSON.parse(result as any);
}
if (!cb) {
return result;
}
cb(null, result);
} catch (er) {
if (!cb) {
throw er;
}
cb(er, null);
}
}
export class RedisStore implements Store {
public name = "redis";
public isCacheableValue;
private redisCache: Redis | Cluster;
private storeArgs: any;
constructor(...args: any[]) {
if (args.length > 0 && args[0].redisInstance) {
this.redisCache = args[0].redisInstance;
} else if (args.length > 0 && args[0].clusterConfig) {
const { nodes, options } = args[0].clusterConfig;
this.redisCache = new Redis.Cluster(nodes, options || {});
} else {
this.redisCache = new (Redis as any)(...args);
}
this.storeArgs = this.redisCache.options;
this.isCacheableValue = this.storeArgs.isCacheableValue || ((value: any) => value !== undefined && value !== null);
}
getClient() {
return this.redisCache;
}
set(key: string, value: any, options: any, cb?: Callback<"OK">) {
if (typeof options === "function") {
cb = options;
options = {};
}
options = options || {};
return handle<"OK">(
async () => {
if (!this.isCacheableValue(value)) {
throw new Error(`"${value}" is not a cacheable value`);
}
const ttl = options.ttl || options.ttl === 0 ? options.ttl : this.storeArgs.ttl;
const val = JSON.stringify(value) || '"undefined"';
if (ttl) {
return this.redisCache.setex(key, ttl, val);
}
return this.redisCache.set(key, val);
},
false,
cb
);
}
get(key: string, options?: any, cb?: Callback<string | null>) {
if (typeof options === "function") {
cb = options;
}
return handle(() => this.redisCache.get(key), true, cb);
}
del(key: string, options: any, cb?: Callback<number>) {
if (typeof options === "function") {
cb = options;
}
return handle(() => this.redisCache.del(key), false, cb);
}
reset(cb?: Callback<"OK">) {
return handle(() => this.redisCache.flushdb(), false, cb);
}
keys(pattern: string, cb: Callback<string[]>) {
if (typeof pattern === "function") {
cb = pattern;
pattern = "*";
}
return handle<string[]>(() => this.redisCache.keys(pattern), false, cb);
}
ttl(key: string, cb?: Callback<number>) {
return handle<number>(() => this.redisCache.ttl(key), false, cb);
}
}
export const redisStore = { create: (...args: any[]) => new RedisStore(...args) };
import "@tsed/platform-cache";
import { Configuration, Inject } from "@tsed/di";
import { getRedisConfiguration } from "./services/redis/getRedisConfiguration";
import { OIDCRedisAdapter } from "./services/oidc/OIDCRedisAdapter";
import "./services/redis/RedisConnection";
@Configuration({
redis: getRedisConfiguration(), // get configuration from env
cache: { // enable cache. store and redis client are set in RedisConnection
ttl: 300
},
adapters: {
Adapter: OIDCRedisAdapter
}
})
export class Server {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment