Skip to content

Instantly share code, notes, and snippets.

@alexanderson1993
Created July 17, 2024 23:58
Show Gist options
  • Save alexanderson1993/b9c091b5465face999a374789e90c9d5 to your computer and use it in GitHub Desktop.
Save alexanderson1993/b9c091b5465face999a374789e90c9d5 to your computer and use it in GitHub Desktop.
TinyBase PartyKit Client

TinyBase PartyKit Synchronizer Client

TinyBase 5.0 introduced MergeableStore to reconcile conflicts between two stores. It also introduced Synchronizers, specifically WsSynchronizer to synchronize stores between devices, using a server as a relay.

This is awesome, but comes with a big drawback: If device A updates some state and goes offline, device B won't see those state updates until device A comes back online. The synchronizer-ws-server implementation only relays data, it doesn't store it.

TinyBaseSynchronizedPartyKitServer.ts aims to fix that by treating a PartyKit room as a client unto itself. It still relays messages between devices, but it syncs and persists its own store to the room's storage. This maintains the current state, allowing devices to access that state whenever they come online and connect without needing another online device to relay that state to them.

Debug.ts and partykit.json show how this might be impelmented in a Partykit project. It includes a request handler which dumps the current storage value for debug purposes by navigating to http://localhost:1999/parties/debug/home.

Enjoy!

import { TinyBaseSynchronizedPartyKitServer } from "@/party/TinyBaseSynchronizedPartyKitServer.ts";
import type * as Party from "partykit/server";
export default class Server
extends TinyBaseSynchronizedPartyKitServer
implements Party.Server
{
constructor(readonly room: Party.Room) {
super(room);
}
onConnect(conn: Party.Connection, ctx: Party.ConnectionContext) {
// A websocket just connected!
console.log(
`Connected:
id: ${conn.id}
room: ${this.room.id}
url: ${new URL(ctx.request.url).pathname}`
);
}
async onRequest(req: Party.Request): Promise<Response> {
const data = await this.room.storage.get("store");
return new Response(
JSON.stringify({
data,
}),
{
headers: {
"Content-Type": "application/json",
},
}
);
}
}
Server satisfies Party.Worker;
{
"$schema": "https://www.partykit.io/schema.json",
"name": "tinybase-partykit",
"main": "party/Main.ts",
"parties": {
"debug": "party/Debug.ts"
},
"compatibilityDate": "2024-07-13"
}
import type * as Party from "partykit/server";
import {
createCustomPersister,
createMergeableStore,
type Content,
type MergeableStore,
type PersisterListener,
type Persists,
} from "tinybase";
import type { IdOrNull } from "tinybase/common";
import {
createCustomSynchronizer,
type Synchronizer,
type Message,
type Receive,
} from "tinybase/synchronizers";
export const MESSAGE_SEPARATOR = "\n";
export const UTF8 = "utf8";
export const UNDEFINED = "\uFFFC";
export const jsonStringWithUndefined = (obj: unknown): string =>
JSON.stringify(obj, (_key, value) =>
value === undefined ? UNDEFINED : value
);
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
export const jsonParseWithUndefined = (str: string): any =>
JSON.parse(str, (_key, value) => (value === UNDEFINED ? undefined : value));
const serverClientId = "server";
export class TinyBaseSynchronizedPartyKitServer implements Party.Server {
synchronizerSend = (
toClientId: IdOrNull,
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
...args: [requestId: IdOrNull, message: Message, body: any]
) => {
if (toClientId) {
this.room
.getConnection(toClientId)
?.send(
serverClientId + MESSAGE_SEPARATOR + jsonStringWithUndefined(args)
);
} else {
this.room.broadcast(
serverClientId + MESSAGE_SEPARATOR + jsonStringWithUndefined(args)
);
}
};
receiveListeners: Receive[] = [];
synchronizerRegisterReceive = (receive: Receive): void => {
this.receiveListeners.push(receive);
};
synchronizerDestroy = () => {
this.receiveListeners = [];
};
synchronizerRequestTimeoutSeconds = 1;
persistListeners = new Set<() => void>();
persisterGet = async () => (await this.room.storage.get("store")) as Content;
persisterSet = async () => {
this.room.storage.put("store", this.store.getMergeableContent());
};
persisterAddListener = (listener: PersisterListener<Persists.StoreOnly>) => {
this.persistListeners.add(listener);
return () => this.persistListeners.delete(listener);
};
persisterDelListener = (listener: () => void) => {
this.persistListeners.delete(listener);
};
store: MergeableStore;
constructor(readonly room: Party.Room) {
this.store = createMergeableStore();
this.init(this.store);
}
synchronizer?: Synchronizer;
async init(store: MergeableStore) {
const persister = createCustomPersister(
store,
this.persisterGet,
this.persisterSet,
this.persisterAddListener,
this.persisterDelListener
);
await persister.load();
await persister.startAutoSave();
const synchronizer = createCustomSynchronizer(
store,
this.synchronizerSend,
this.synchronizerRegisterReceive,
this.synchronizerDestroy,
this.synchronizerRequestTimeoutSeconds
);
await synchronizer.startSync();
this.synchronizer = synchronizer;
}
onMessage(payload: string, sender: Party.Connection) {
const splitAt = payload.indexOf(MESSAGE_SEPARATOR);
const clientId = sender.id;
if (splitAt !== -1) {
const toClientId = payload.slice(0, splitAt);
const message = payload.slice(splitAt + 1);
const messagePayload = jsonParseWithUndefined(message) as [
requestId: IdOrNull,
message: Message,
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
body: any,
];
if (toClientId === "") {
this.room.broadcast(clientId + MESSAGE_SEPARATOR + message, [clientId]);
}
if (toClientId === "" || toClientId === serverClientId) {
for (const listener of this.receiveListeners) {
listener(clientId, ...messagePayload);
}
} else {
this.room
.getConnection(toClientId)
?.send(clientId + MESSAGE_SEPARATOR + message);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment