Skip to content

Instantly share code, notes, and snippets.

@samwillis

samwillis/y-pouchdb.ts

Last active Jan 14, 2021
Embed
What would you like to do?
(Alpha) PouchDB integration for Yjs
import * as Y from 'yjs'
import * as mutex from 'lib0/mutex.js'
import { Observable } from 'lib0/observable.js'
import PouchDB from 'pouchdb';
// This is the name of the top level Y.Map that is used to construct the main pouchDB
// JSON document.
const topDataYMapName = 'data';
// This is a top level key set on a pouchDB document to indicate that it is managed by
// Yjs, by indexing this we can find and watch for all changes to these documents and
// auto merge any conflicts.
const managedByYIndicatorKey = 'managed_by_Y';
const recentRevsToStore = 20;
// TODO: Global Change/Update Manger
// We need a global change/upadte manger to handel PouchDB conflicts when a document
// is not open
export type PouchDoc = PouchDB.Core.Document<any> // TODO: Propper type
export interface YPouchDocOptions {
ydoc?: Y.Doc;
pdoc?: PouchDoc;
storeTimeout?: number;
loadTimeout?: number;
autoStore?: boolean;
autoUpdate?: boolean;
topDataYMapName?: string;
managedByYIndicatorKey?: string;
extractorFunction?: Function;
extractorVersion?: number;
}
export class YPouchDoc extends Observable<{}> {
public db: PouchDB.Database;
public docId: string;
public ydoc: Y.Doc;
public ydocCreated: boolean;
public pdoc: PouchDoc;
public storeTimeout: number;
public loadTimeout: number;
public autoStore: boolean;
public autoUpdate: boolean;
public topDataYMapName: string;
public managedByYIndicatorKey: string;
public data: Y.Map<any>;
public loaded: boolean;
public currentRev: string;
public waitLoaded: Promise<YPouchDoc>;
public extractorFunction: Function | null;
public extractorVersion: number | null;
private _mergedConflicts: Set<string>;
private _recentRevs: Array<string>;
private _mux: Function;
private _storeTimeoutId: ReturnType<typeof setTimeout> | null;
private _loadTimeoutId: ReturnType<typeof setTimeout> | null;
private _cancelLiveUpdate: Function | null;
private _storeUpdate: Function;
constructor (db: PouchDB.Database, docId: string, options?: YPouchDocOptions) {
super();
// PouchDB object
this.db = db;
// The _id of the PouchDB document
this.docId = docId;
// A Y.Doc for handling the data
this.ydoc = options?.ydoc || new Y.Doc();
this.ydocCreated = !options?.ydoc;
// initial/updated PouchDB doc data
this.pdoc = options?.pdoc;
// Automatically store updates after the storeTimeout
this.autoStore = !(options?.autoStore === false)
// Timeout in ms until data is merged and persisted in pouchdb.
this.storeTimeout = options?.storeTimeout || 1000;
// Timeout in ms until data is loaded after change notification (debouncing).
this.loadTimeout = options?.storeTimeout || 500;
// Auto start live state update on construction after the loadTimeout
this.autoUpdate = (!options || options.autoUpdate === undefined)
|| !!options.autoUpdate;
this.extractorFunction = options?.extractorFunction || null;
this.extractorVersion = options?.extractorVersion || null;
this.topDataYMapName = options?.topDataYMapName || topDataYMapName;
this.managedByYIndicatorKey = options?.managedByYIndicatorKey || managedByYIndicatorKey;
this.data = this.ydoc.getMap(this.topDataYMapName);
this.loaded = false;
this.currentRev = '';
this._mergedConflicts = new Set();
this._recentRevs = [];
this._mux = mutex.createMutex();
this._storeTimeoutId = null;
this._loadTimeoutId = null;
this._cancelLiveUpdate = null;
this._storeUpdate = (update: Y.YEvent, origin: any, doc: Y.Doc, transaction: Y.Transaction) => {
if ((origin==YPouchDoc) && !transaction.local) return; // Skip updates from remote revs
this._mux(() => {
// Debounce store call
let timeout = this.storeTimeout;
if ((this._mergedConflicts.size > 0) && (origin==YPouchDoc) && transaction.local) {
// If we have merged a conflict that we lost save immediately (timeout = 0)
if (this._storeTimeoutId) {
clearTimeout(this._storeTimeoutId);
this._storeTimeoutId = null;
}
timeout = 0;
}
if (this._storeTimeoutId !== null) {
return;
}
this._storeTimeoutId = setTimeout(async () => {
await this.storeState();
this._storeTimeoutId = null;
}, timeout)
})
}
this.ydoc.on('update', this._storeUpdate)
this.waitLoaded = this.loadState(this.pdoc).then(() => {
if (this.autoUpdate) {
this.startLiveUpdate();
}
return this
});
this.destroy = this.destroy.bind(this)
this.ydoc.on('destroy', this.destroy)
}
async storeState(): Promise<void> {
// Store the state to PouchDB
// console.log(this.db.name, 'storeState')
// TODO: Check/Add Node support
// const blob = new Blob([Y.encodeStateAsUpdate(doc)]);
// TODO: Found a bug in PouchDB:
// https://github.com/pouchdb/pouchdb/issues/8257
// When fixed use the Blob, its quicker!
const blob = btoa(String.fromCharCode.apply(null, ((Y.encodeStateAsUpdate(this.ydoc) as unknown) as number[])));
let extracted = {};
if (this.extractorFunction) {
extracted = this.extractorFunction(this.ydoc);
}
const pdoc = {
...this.ydoc.getMap(this.topDataYMapName).toJSON(),
...extracted,
'extractor_version': this.extractorVersion,
'_id': this.docId,
'_attachments': {
'ydoc': {
"content_type": "application/octet-stream",
"data": blob,
}
}
}
pdoc[this.managedByYIndicatorKey] = true;
if (this.currentRev) {
// This is not set on new docs
pdoc._rev = this.currentRev;
}
try {
const response = await this.db.put(pdoc);
this.currentRev = response.rev;
this._recentRevs.unshift(response.rev);
this._recentRevs.length = Math.min(this._recentRevs.length, recentRevsToStore);
} catch (err) {
if (err.name ==='conflict') {
await this.loadState();
return await this.storeState();
} else {
throw err;
}
}
// Remove merged conflicts, this is done after the state has been stored to ensure
// we don't loose anything.
if (this._mergedConflicts.size > 0) {
await this.deleteMergedConflicts();
}
}
async deleteMergedConflicts() {
for (const conflictRev of this._mergedConflicts) {
try {
const response = await this.db.remove(this.docId, conflictRev);
this._recentRevs.unshift(response.rev);
this._recentRevs.length = Math.min(this._recentRevs.length, recentRevsToStore);
} catch (err) {
// if we get a conflict error its because the rev has already been deleted
if (err.name !== 'conflict') {
throw err;
}
}
this._mergedConflicts.delete(conflictRev)
}
}
async loadState(pdoc?: PouchDoc) {
// Load the sate from PouchDB
// console.log(this.db.name, 'loadState')
const docUpdates: Array<Uint8Array> = [];
let isLocal = false;
if (!pdoc) {
try {
pdoc = await this.db.get(this.docId, {
conflicts: true,
attachments: true,
binary: true,
});
} catch (err) {
if (err.name === 'not_found') {
return;
} else {
throw err;
}
}
}
if (!this._recentRevs.includes(pdoc._rev)) {
const blob = pdoc._attachments.ydoc.data;
// TODO: Check/Add Node support
docUpdates.push(new Uint8Array(await new Response(blob).arrayBuffer()));
this._recentRevs.unshift(pdoc._rev);
this.currentRev = pdoc._rev;
}
if (pdoc._conflicts) {
// TODO: parallelize these requests
for (const conflictRev of pdoc._conflicts) {
// We check to see if we have this conflict rev in our _recentRevs, if we do
// it is because we lost the conflict and so should resolve it and save the
// update. Set the transaction to isLocal=true to force a save in the update
// event handler.
// TODO: schedule a check for say 10 seconds time if we won the conflict to see
// if it has been resolved. Maybe check _mergedConflicts have been deleted?
const lostConflict = this._recentRevs.includes(conflictRev);
if (lostConflict) isLocal = true;
if (!this._recentRevs.includes(conflictRev)) {
let conflictPdoc;
try {
conflictPdoc = await this.db.get(this.docId, {
rev: conflictRev,
attachments: true,
binary: true,
});
} catch (err) {
// if we get a conflict error its because the rev has already been deleted
// this would happen if some other instance/user has already merged this
// conflict.
if (err.name !== 'conflict') {
throw err;
}
}
if (conflictPdoc?._attachments?.ydoc) {
const conflictBlob = (conflictPdoc._attachments.ydoc as any).data;
docUpdates.push(new Uint8Array(await new Response(conflictBlob).arrayBuffer()));
this._recentRevs.unshift(conflictPdoc._rev);
}
}
// Store merged conflicts rev so it can be removed after next state save.
this._mergedConflicts.add(conflictRev);
}
}
if (docUpdates.length > 0) {
Y.transact(this.ydoc, () => {
for (const docUpdate of docUpdates) {
Y.applyUpdate(this.ydoc, docUpdate);
}
}, YPouchDoc, isLocal)
}
// trim length of recent revs
this._recentRevs.length = Math.min(this._recentRevs.length, recentRevsToStore);
this.loaded = true;
}
async startLiveUpdate() {
// Start watching for live updates from other nodes
const changes = this.db.changes({
live: true,
doc_ids: [this.docId], // eslint-disable-line @typescript-eslint/camelcase
return_docs: false, // eslint-disable-line @typescript-eslint/camelcase
}).on('change', async (change) => {
for (const rev of change.changes) {
if (!this._recentRevs.includes(rev.rev)){
if (this._loadTimeoutId !== null) {
return;
}
this._loadTimeoutId = setTimeout(async () => {
await this.loadState();
this._loadTimeoutId = null;
}, this.loadTimeout)
}
}
})
this._cancelLiveUpdate = () => {
changes.cancel();
}
}
stopLiveUpdate() {
// Stop watching for live updates from other nodes
if (this._cancelLiveUpdate) {
this._cancelLiveUpdate();
this._cancelLiveUpdate = null;
}
}
destroy () {
this.stopLiveUpdate();
this.ydoc.off('update', this._storeUpdate)
if (this._storeTimeoutId) {
// Clear timeout and store immediately.
clearTimeout(this._storeTimeoutId);
this.storeState();
}
this.ydoc.off('destroy', this.destroy)
if (this.ydocCreated) {
this.ydoc.destroy();
}
}
}
// This for using with TipTap - it should not be required when this is resolved:
// https://github.com/ueberdosis/tiptap-next/issues/70
export class PouchDbProvider extends Observable<{}> {
public ypDoc: YPouchDoc;
public doc: Y.Doc;
constructor (ypDoc: YPouchDoc) {
super();
this.ypDoc = ypDoc;
this.doc = ypDoc.ydoc;
}
destroy () {
// Noop
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment