Skip to content

Instantly share code, notes, and snippets.

@zgotsch
Created June 7, 2023 22:18
Show Gist options
  • Save zgotsch/e10506a0a6b4c9fcfe043f729b4ed500 to your computer and use it in GitHub Desktop.
Save zgotsch/e10506a0a6b4c9fcfe043f729b4ed500 to your computer and use it in GitHub Desktop.
A small versioned graph structure on top of Postgres
import {sql} from "@vercel/postgres";
import assertNever from "./assertNever";
import invariant from "small-invariant";
type Identified = {id: unknown};
type IdentifierOf<T extends {id: unknown}> = T["id"];
type Node<T extends Identified> = {
id: IdentifierOf<T>;
version: number;
is_latest: boolean;
data: T;
};
type Edge<T extends Identified> = {
source_id: unknown;
target_id: unknown;
id: IdentifierOf<T>;
data: T;
version: number;
is_latest: boolean;
};
export async function createNode<T extends Identified>(data: T): Promise<IdentifierOf<T>> {
const result = await sql`INSERT INTO nodes (data) VALUES (${JSON.stringify(data)})`;
return data.id;
}
export async function getNode<T extends Identified>(id: IdentifierOf<T>): Promise<null | Node<T>> {
const result = await sql`SELECT * FROM nodes WHERE id = ${JSON.stringify(
id
)} AND is_latest = TRUE`;
const [firstRow] = result.rows;
if (!firstRow) {
return null;
}
return firstRow as Node<T>;
}
export async function updateNode<T extends Identified, U extends Identified>(
id: IdentifierOf<T>,
updater: (oldValue: null | T) => U
): Promise<void> {
const client = await sql.connect();
try {
await client.sql`BEGIN`;
let nextVersion;
const oldValue = await getNode(id);
if (!oldValue) {
await createNode(updater(null));
} else {
nextVersion = oldValue.version + 1;
await client.sql`UPDATE nodes SET is_latest = FALSE WHERE id = ${JSON.stringify(id)}`;
await client.sql`INSERT INTO nodes (id, version, is_latest, data) VALUES (${JSON.stringify(
id
)}, ${nextVersion}, TRUE, ${JSON.stringify(updater(oldValue.data))})`;
}
await client.sql`COMMIT`;
} catch (e) {
await client.sql`ROLLBACK`;
throw e;
} finally {
await client.release();
}
}
export async function deleteNode<T extends Identified>(id: IdentifierOf<T>): Promise<void> {
await sql`UPDATE nodes SET is_latest = FALSE WHERE id = ${JSON.stringify(id)}`;
}
export async function createEdge<T>(sourceId: unknown, targetId: unknown, data: T): Promise<void> {
await sql`INSERT INTO edges (source_id, target_id, data) VALUES (${JSON.stringify(
sourceId
)}, ${JSON.stringify(targetId)}, ${JSON.stringify(data)})`;
}
export async function getOutgoingEdges<T extends Identified>(
sourceId: unknown
): Promise<Array<Edge<T>>> {
const result = await sql`SELECT * FROM edges WHERE source_id = ${JSON.stringify(sourceId)}`;
return result.rows as Array<Edge<T>>;
}
export async function getIncomingEdges<T extends Identified>(
targetId: unknown
): Promise<Array<Edge<T>>> {
const result = await sql`SELECT * FROM edges WHERE target_id = ${JSON.stringify(targetId)}`;
return result.rows as Array<Edge<T>>;
}
export async function getEdges<T extends Identified>(
nodeId: unknown
): Promise<{incoming: Array<Edge<T>>; outgoing: Array<Edge<T>>}> {
const client = await sql.connect();
try {
await client.sql`BEGIN`;
const outgoingResult = await client.sql`SELECT * FROM edges WHERE source_id = ${JSON.stringify(
nodeId
)} AND is_latest = TRUE`;
const incomingResult = await client.sql`SELECT * FROM edges WHERE target_id = ${JSON.stringify(
nodeId
)} AND is_latest = TRUE`;
await client.sql`COMMIT`;
return {
incoming: incomingResult.rows as Array<Edge<T>>,
outgoing: outgoingResult.rows as Array<Edge<T>>,
};
} catch (e) {
await client.sql`ROLLBACK`;
throw e;
} finally {
await client.release();
}
}
type UpdateResult<T> =
| {
operation: "update";
value: T;
}
| {
operation: "delete";
}
| {
operation: "ignore";
};
export async function updateEdges<T extends Identified, U extends Identified>(
sourceId: unknown,
targetId: unknown,
updater: (oldEdge: T) => UpdateResult<U>
): Promise<void> {
const client = await sql.connect();
try {
await client.sql`BEGIN`;
const oldEdges = await client.sql`SELECT * FROM edges WHERE source_id = ${JSON.stringify(
sourceId
)} AND target_id = ${JSON.stringify(targetId)} AND is_latest = TRUE`;
const edgesToDelete: Array<Edge<T>> = [];
const edgesToUpdate: Array<{oldEdge: Edge<T>; newData: U}> = [];
for (const _oldEdge of oldEdges.rows) {
const oldEdge = _oldEdge as Edge<T>;
const result = updater(oldEdge.data);
if (result.operation === "ignore") {
// do nothing
} else if (result.operation === "delete") {
edgesToDelete.push(oldEdge);
} else if (result.operation === "update") {
const newData = result.value;
edgesToUpdate.push({oldEdge, newData});
} else {
assertNever(result);
}
}
for (const edge of edgesToDelete) {
await client.sql`UPDATE edges SET is_latest = FALSE WHERE source_id = ${JSON.stringify(
edge.source_id
)} AND target_id = ${JSON.stringify(edge.target_id)} AND version = ${
edge.version
} AND id = ${JSON.stringify(edge.id)}`;
}
for (const {oldEdge, newData} of edgesToUpdate) {
invariant(
oldEdge.id === newData.id,
"The id of the new data must match the id of the old edge"
);
const newEdge: Edge<typeof newData> = {
...oldEdge,
version: oldEdge.version + 1,
data: newData,
};
await client.sql`UPDATE edges SET is_latest = FALSE WHERE source_id = ${JSON.stringify(
oldEdge.source_id
)} AND target_id = ${JSON.stringify(oldEdge.target_id)} AND version = ${
oldEdge.version
} AND id = ${JSON.stringify(oldEdge.id)}`;
await client.sql`INSERT INTO edges (id, source_id, target_id, version, is_latest, data) VALUES (${JSON.stringify(
newEdge.id
)}, ${JSON.stringify(newEdge.source_id)}, ${JSON.stringify(newEdge.target_id)}, ${
newEdge.version
}, TRUE, ${JSON.stringify(newEdge.data)})`;
}
await client.sql`COMMIT`;
} catch (e) {
await client.sql`ROLLBACK`;
throw e;
} finally {
await client.release();
}
}
CREATE TABLE IF NOT EXISTS
nodes (
data JSONB,
id TEXT GENERATED ALWAYS AS (data -> 'id') STORED NOT NULL,
version INTEGER NOT NULL DEFAULT 0,
is_latest BOOLEAN NOT NULL DEFAULT TRUE,
UNIQUE (id, version),
UNIQUE (id, is_latest)
);
CREATE INDEX IF NOT EXISTS latest_version_idx ON nodes (id, is_latest DESC);
CREATE TABLE IF NOT EXISTS
edges (
source_id TEXT,
target_id TEXT,
data JSONB,
id TEXT GENERATED ALWAYS AS (data -> 'id') STORED NOT NULL,
version INTEGER NOT NULL DEFAULT 0,
is_latest BOOLEAN NOT NULL DEFAULT TRUE,
UNIQUE (source_id, target_id, id, version),
FOREIGN KEY (source_id) REFERENCES nodes (id),
FOREIGN KEY (target_id) REFERENCES nodes (id)
);
CREATE INDEX IF NOT EXISTS source_id_idx ON edges (source_id);
CREATE INDEX IF NOT EXISTS target_id_idx ON edges (target_id);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment