|
import type * as Party from "partykit/server"; |
|
|
|
import { |
|
createCustomPersister, |
|
createMergeableStore, |
|
type Content, |
|
type MergeableStore, |
|
type PersisterListener, |
|
type Persists, |
|
} from "tinybase"; |
|
import type { IdOrNull } from "tinybase/common"; |
|
import { |
|
createCustomSynchronizer, |
|
type Synchronizer, |
|
type Message, |
|
type Receive, |
|
} from "tinybase/synchronizers"; |
|
export const MESSAGE_SEPARATOR = "\n"; |
|
export const UTF8 = "utf8"; |
|
export const UNDEFINED = "\uFFFC"; |
|
|
|
export const jsonStringWithUndefined = (obj: unknown): string => |
|
JSON.stringify(obj, (_key, value) => |
|
value === undefined ? UNDEFINED : value |
|
); |
|
|
|
// biome-ignore lint/suspicious/noExplicitAny: <explanation> |
|
export const jsonParseWithUndefined = (str: string): any => |
|
JSON.parse(str, (_key, value) => (value === UNDEFINED ? undefined : value)); |
|
|
|
const serverClientId = "server"; |
|
|
|
export class TinyBaseSynchronizedPartyKitServer implements Party.Server { |
|
synchronizerSend = ( |
|
toClientId: IdOrNull, |
|
// biome-ignore lint/suspicious/noExplicitAny: <explanation> |
|
...args: [requestId: IdOrNull, message: Message, body: any] |
|
) => { |
|
if (toClientId) { |
|
this.room |
|
.getConnection(toClientId) |
|
?.send( |
|
serverClientId + MESSAGE_SEPARATOR + jsonStringWithUndefined(args) |
|
); |
|
} else { |
|
this.room.broadcast( |
|
serverClientId + MESSAGE_SEPARATOR + jsonStringWithUndefined(args) |
|
); |
|
} |
|
}; |
|
|
|
receiveListeners: Receive[] = []; |
|
synchronizerRegisterReceive = (receive: Receive): void => { |
|
this.receiveListeners.push(receive); |
|
}; |
|
synchronizerDestroy = () => { |
|
this.receiveListeners = []; |
|
}; |
|
synchronizerRequestTimeoutSeconds = 1; |
|
|
|
persistListeners = new Set<() => void>(); |
|
persisterGet = async () => (await this.room.storage.get("store")) as Content; |
|
|
|
persisterSet = async () => { |
|
this.room.storage.put("store", this.store.getMergeableContent()); |
|
}; |
|
persisterAddListener = (listener: PersisterListener<Persists.StoreOnly>) => { |
|
this.persistListeners.add(listener); |
|
return () => this.persistListeners.delete(listener); |
|
}; |
|
persisterDelListener = (listener: () => void) => { |
|
this.persistListeners.delete(listener); |
|
}; |
|
|
|
store: MergeableStore; |
|
constructor(readonly room: Party.Room) { |
|
this.store = createMergeableStore(); |
|
|
|
this.init(this.store); |
|
} |
|
|
|
synchronizer?: Synchronizer; |
|
async init(store: MergeableStore) { |
|
const persister = createCustomPersister( |
|
store, |
|
this.persisterGet, |
|
this.persisterSet, |
|
this.persisterAddListener, |
|
this.persisterDelListener |
|
); |
|
await persister.load(); |
|
await persister.startAutoSave(); |
|
|
|
const synchronizer = createCustomSynchronizer( |
|
store, |
|
this.synchronizerSend, |
|
this.synchronizerRegisterReceive, |
|
this.synchronizerDestroy, |
|
this.synchronizerRequestTimeoutSeconds |
|
); |
|
await synchronizer.startSync(); |
|
this.synchronizer = synchronizer; |
|
} |
|
|
|
onMessage(payload: string, sender: Party.Connection) { |
|
const splitAt = payload.indexOf(MESSAGE_SEPARATOR); |
|
const clientId = sender.id; |
|
|
|
if (splitAt !== -1) { |
|
const toClientId = payload.slice(0, splitAt); |
|
const message = payload.slice(splitAt + 1); |
|
const messagePayload = jsonParseWithUndefined(message) as [ |
|
requestId: IdOrNull, |
|
message: Message, |
|
// biome-ignore lint/suspicious/noExplicitAny: <explanation> |
|
body: any, |
|
]; |
|
if (toClientId === "") { |
|
this.room.broadcast(clientId + MESSAGE_SEPARATOR + message, [clientId]); |
|
} |
|
if (toClientId === "" || toClientId === serverClientId) { |
|
for (const listener of this.receiveListeners) { |
|
listener(clientId, ...messagePayload); |
|
} |
|
} else { |
|
this.room |
|
.getConnection(toClientId) |
|
?.send(clientId + MESSAGE_SEPARATOR + message); |
|
} |
|
} |
|
} |
|
} |