Skip to content

Instantly share code, notes, and snippets.

@samwillis
Last active December 20, 2023 23:11
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save samwillis/1465da23194d1ad480a5548458864077 to your computer and use it in GitHub Desktop.
Save samwillis/1465da23194d1ad480a5548458864077 to your computer and use it in GitHub Desktop.
(Alpha) PouchDB integration for Yjs
import * as Y from 'yjs'
import * as mutex from 'lib0/mutex.js'
import { Observable } from 'lib0/observable.js'
import PouchDB from 'pouchdb';
// This is the name of the top level Y.Map that is used to construct the main pouchDB
// JSON document.
const topDataYMapName = 'data';
// This is a top level key set on a pouchDB document to indicate that it is managed by
// Yjs, by indexing this we can find and watch for all changes to these documents and
// auto merge any conflicts.
const managedByYIndicatorKey = 'managed_by_Y';
const recentRevsToStore = 20;
// TODO: Global Change/Update Manger
// We need a global change/upadte manger to handel PouchDB conflicts when a document
// is not open
export type PouchDoc = PouchDB.Core.Document<any> // TODO: Propper type
export interface YPouchDocOptions {
ydoc?: Y.Doc;
pdoc?: PouchDoc;
storeTimeout?: number;
loadTimeout?: number;
autoStore?: boolean;
autoUpdate?: boolean;
topDataYMapName?: string;
managedByYIndicatorKey?: string;
extractorFunction?: Function;
extractorVersion?: number;
}
export class YPouchDoc extends Observable<{}> {
public db: PouchDB.Database;
public docId: string;
public ydoc: Y.Doc;
public ydocCreated: boolean;
public pdoc: PouchDoc;
public storeTimeout: number;
public loadTimeout: number;
public autoStore: boolean;
public autoUpdate: boolean;
public topDataYMapName: string;
public managedByYIndicatorKey: string;
public data: Y.Map<any>;
public loaded: boolean;
public currentRev: string;
public waitLoaded: Promise<YPouchDoc>;
public extractorFunction: Function | null;
public extractorVersion: number | null;
private _mergedConflicts: Set<string>;
private _recentRevs: Array<string>;
private _mux: Function;
private _storeTimeoutId: ReturnType<typeof setTimeout> | null;
private _loadTimeoutId: ReturnType<typeof setTimeout> | null;
private _cancelLiveUpdate: Function | null;
private _storeUpdate: Function;
constructor (db: PouchDB.Database, docId: string, options?: YPouchDocOptions) {
super();
// PouchDB object
this.db = db;
// The _id of the PouchDB document
this.docId = docId;
// A Y.Doc for handling the data
this.ydoc = options?.ydoc || new Y.Doc();
this.ydocCreated = !options?.ydoc;
// initial/updated PouchDB doc data
this.pdoc = options?.pdoc;
// Automatically store updates after the storeTimeout
this.autoStore = !(options?.autoStore === false)
// Timeout in ms until data is merged and persisted in pouchdb.
this.storeTimeout = options?.storeTimeout || 1000;
// Timeout in ms until data is loaded after change notification (debouncing).
this.loadTimeout = options?.storeTimeout || 500;
// Auto start live state update on construction after the loadTimeout
this.autoUpdate = (!options || options.autoUpdate === undefined)
|| !!options.autoUpdate;
this.extractorFunction = options?.extractorFunction || null;
this.extractorVersion = options?.extractorVersion || null;
this.topDataYMapName = options?.topDataYMapName || topDataYMapName;
this.managedByYIndicatorKey = options?.managedByYIndicatorKey || managedByYIndicatorKey;
this.data = this.ydoc.getMap(this.topDataYMapName);
this.loaded = false;
this.currentRev = '';
this._mergedConflicts = new Set();
this._recentRevs = [];
this._mux = mutex.createMutex();
this._storeTimeoutId = null;
this._loadTimeoutId = null;
this._cancelLiveUpdate = null;
this._storeUpdate = (update: Y.YEvent, origin: any, doc: Y.Doc, transaction: Y.Transaction) => {
if ((origin==YPouchDoc) && !transaction.local) return; // Skip updates from remote revs
this._mux(() => {
// Debounce store call
let timeout = this.storeTimeout;
if ((this._mergedConflicts.size > 0) && (origin==YPouchDoc) && transaction.local) {
// If we have merged a conflict that we lost save immediately (timeout = 0)
if (this._storeTimeoutId) {
clearTimeout(this._storeTimeoutId);
this._storeTimeoutId = null;
}
timeout = 0;
}
if (this._storeTimeoutId !== null) {
return;
}
this._storeTimeoutId = setTimeout(async () => {
await this.storeState();
this._storeTimeoutId = null;
}, timeout)
})
}
this.ydoc.on('update', this._storeUpdate)
this.waitLoaded = this.loadState(this.pdoc).then(() => {
if (this.autoUpdate) {
this.startLiveUpdate();
}
return this
});
this.destroy = this.destroy.bind(this)
this.ydoc.on('destroy', this.destroy)
}
async storeState(): Promise<void> {
// Store the state to PouchDB
// console.log(this.db.name, 'storeState')
// TODO: Check/Add Node support
// const blob = new Blob([Y.encodeStateAsUpdate(doc)]);
// TODO: Found a bug in PouchDB:
// https://github.com/pouchdb/pouchdb/issues/8257
// When fixed use the Blob, its quicker!
const blob = btoa(String.fromCharCode.apply(null, ((Y.encodeStateAsUpdate(this.ydoc) as unknown) as number[])));
let extracted = {};
if (this.extractorFunction) {
extracted = this.extractorFunction(this.ydoc);
}
const pdoc = {
...this.ydoc.getMap(this.topDataYMapName).toJSON(),
...extracted,
'extractor_version': this.extractorVersion,
'_id': this.docId,
'_attachments': {
'ydoc': {
"content_type": "application/octet-stream",
"data": blob,
}
}
}
pdoc[this.managedByYIndicatorKey] = true;
if (this.currentRev) {
// This is not set on new docs
pdoc._rev = this.currentRev;
}
try {
const response = await this.db.put(pdoc);
this.currentRev = response.rev;
this._recentRevs.unshift(response.rev);
this._recentRevs.length = Math.min(this._recentRevs.length, recentRevsToStore);
} catch (err) {
if (err.name ==='conflict') {
await this.loadState();
return await this.storeState();
} else {
throw err;
}
}
// Remove merged conflicts, this is done after the state has been stored to ensure
// we don't loose anything.
if (this._mergedConflicts.size > 0) {
await this.deleteMergedConflicts();
}
}
async deleteMergedConflicts() {
for (const conflictRev of this._mergedConflicts) {
try {
const response = await this.db.remove(this.docId, conflictRev);
this._recentRevs.unshift(response.rev);
this._recentRevs.length = Math.min(this._recentRevs.length, recentRevsToStore);
} catch (err) {
// if we get a conflict error its because the rev has already been deleted
if (err.name !== 'conflict') {
throw err;
}
}
this._mergedConflicts.delete(conflictRev)
}
}
async loadState(pdoc?: PouchDoc) {
// Load the sate from PouchDB
// console.log(this.db.name, 'loadState')
const docUpdates: Array<Uint8Array> = [];
let isLocal = false;
if (!pdoc) {
try {
pdoc = await this.db.get(this.docId, {
conflicts: true,
attachments: true,
binary: true,
});
} catch (err) {
if (err.name === 'not_found') {
return;
} else {
throw err;
}
}
}
if (!this._recentRevs.includes(pdoc._rev)) {
const blob = pdoc._attachments.ydoc.data;
// TODO: Check/Add Node support
docUpdates.push(new Uint8Array(await new Response(blob).arrayBuffer()));
this._recentRevs.unshift(pdoc._rev);
this.currentRev = pdoc._rev;
}
if (pdoc._conflicts) {
// TODO: parallelize these requests
for (const conflictRev of pdoc._conflicts) {
// We check to see if we have this conflict rev in our _recentRevs, if we do
// it is because we lost the conflict and so should resolve it and save the
// update. Set the transaction to isLocal=true to force a save in the update
// event handler.
// TODO: schedule a check for say 10 seconds time if we won the conflict to see
// if it has been resolved. Maybe check _mergedConflicts have been deleted?
const lostConflict = this._recentRevs.includes(conflictRev);
if (lostConflict) isLocal = true;
if (!this._recentRevs.includes(conflictRev)) {
let conflictPdoc;
try {
conflictPdoc = await this.db.get(this.docId, {
rev: conflictRev,
attachments: true,
binary: true,
});
} catch (err) {
// if we get a conflict error its because the rev has already been deleted
// this would happen if some other instance/user has already merged this
// conflict.
if (err.name !== 'conflict') {
throw err;
}
}
if (conflictPdoc?._attachments?.ydoc) {
const conflictBlob = (conflictPdoc._attachments.ydoc as any).data;
docUpdates.push(new Uint8Array(await new Response(conflictBlob).arrayBuffer()));
this._recentRevs.unshift(conflictPdoc._rev);
}
}
// Store merged conflicts rev so it can be removed after next state save.
this._mergedConflicts.add(conflictRev);
}
}
if (docUpdates.length > 0) {
Y.transact(this.ydoc, () => {
for (const docUpdate of docUpdates) {
Y.applyUpdate(this.ydoc, docUpdate);
}
}, YPouchDoc, isLocal)
}
// trim length of recent revs
this._recentRevs.length = Math.min(this._recentRevs.length, recentRevsToStore);
this.loaded = true;
}
async startLiveUpdate() {
// Start watching for live updates from other nodes
const changes = this.db.changes({
live: true,
doc_ids: [this.docId], // eslint-disable-line @typescript-eslint/camelcase
return_docs: false, // eslint-disable-line @typescript-eslint/camelcase
}).on('change', async (change) => {
for (const rev of change.changes) {
if (!this._recentRevs.includes(rev.rev)){
if (this._loadTimeoutId !== null) {
return;
}
this._loadTimeoutId = setTimeout(async () => {
await this.loadState();
this._loadTimeoutId = null;
}, this.loadTimeout)
}
}
})
this._cancelLiveUpdate = () => {
changes.cancel();
}
}
stopLiveUpdate() {
// Stop watching for live updates from other nodes
if (this._cancelLiveUpdate) {
this._cancelLiveUpdate();
this._cancelLiveUpdate = null;
}
}
destroy () {
this.stopLiveUpdate();
this.ydoc.off('update', this._storeUpdate)
if (this._storeTimeoutId) {
// Clear timeout and store immediately.
clearTimeout(this._storeTimeoutId);
this.storeState();
}
this.ydoc.off('destroy', this.destroy)
if (this.ydocCreated) {
this.ydoc.destroy();
}
}
}
// This for using with TipTap - it should not be required when this is resolved:
// https://github.com/ueberdosis/tiptap-next/issues/70
export class PouchDbProvider extends Observable<{}> {
public ypDoc: YPouchDoc;
public doc: Y.Doc;
constructor (ypDoc: YPouchDoc) {
super();
this.ypDoc = ypDoc;
this.doc = ypDoc.ydoc;
}
destroy () {
// Noop
}
}
@Sjoerd82
Copy link

Hi Sam,

Is this the latest version? You mentioned (on yjs.dev) that it's missing centralised conflict handling of documents that are not currently open. Is this something that has been handled by now. Also, I'm not entirely sure that I understand what it means --- what can go wrong (or what functionality are we missing out on) without this being handled?

rgds,
Sjoerd

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment