Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
yjs v13(refactored) level db persistence
import level = require('level')
import * as encoding from 'lib0/dist/encoding'
import * as decoding from 'lib0/dist/decoding'
import * as syncProtocol from 'y-protocols/dist/sync.js'
import * as authProtocol from 'y-protocols/dist/auth.js'
import { createMutex } from 'lib0/dist/mutex.js'
const mux = createMutex()
function getEmptyEncodedStateVector() {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, 0)
return encoding.toUint8Array(encoder)
}
/*
* Improves the uniqueness of timestamps.
* We gamble with the fact that users won't create more than 10000 changes on a single document
* within one millisecond (also assuming clock works correctly).
*/
let timestampIterator = 0
/**
* @return {string} A random, time-based string starting with "${roomName}:"
*/
const getNextTimestamp = () => {
timestampIterator = (timestampIterator + 1) % 10000
return `${Date.now()}${timestampIterator.toString().padStart(4, '0')}`
}
/**
* @param {string} docName
* @return {string}
*/
const generateEntryKey = docName => `${docName}#${getNextTimestamp()}`
/**
*
* @param {any} db
* @param {string} docName
* @param {Uint8Array | ArrayBuffer} buf
*/
const writeEntry = (db, docName, buf) => db.put(generateEntryKey(docName), buf)
/**
* @param {Uint8Array} arr
* @param {Y.Y} ydocument
*/
const readEntry = (arr, ydocument) => mux(() =>
syncProtocol.readSyncMessage(
decoding.createDecoder(arr), encoding.createEncoder(), ydocument
)
)
/**
* @param {any} db
* @param {string} docName
* @param {Y.Y} ydocument
*/
const loadFromPersistence = (db, docName, ydocument) => new Promise((resolve, reject) =>
db.createReadStream({
gte: `${docName}#`,
lte: `${docName}#Z`,
keys: false,
values: true
})
.on('data', data => readEntry(data, ydocument))
.on('error', reject)
.on('end', resolve)
.on('close', resolve)
)
const persistState = (db, docName, ydocument) => {
const encoder = encoding.createEncoder()
syncProtocol.writeSyncStep2(encoder, ydocument, getEmptyEncodedStateVector())
const entryKey = generateEntryKey(docName)
const entryPromise = db.put(entryKey, encoding.toUint8Array(encoder))
const delOps = []
return new Promise((resolve, reject) => db.createKeyStream({
gte: `${docName}#`,
lt: entryKey
})
.on('data', key => delOps.push({ type: 'del', key }))
.on('error', reject)
.on('end', resolve)
.on('close', resolve)
).then(() => entryPromise).then(() => db.batch(delOps))
}
/**
* Persistence layer for Leveldb.
*/
export class LevelDbPersistence {
/**
* @param {string} fpath Path to leveldb database
*/
constructor (fpath, conf = {}) {
this.db = level(fpath, { valueEncoding: 'binary' })
this.conf = Object.assign({
writeStateOnLoad: true
}, conf)
}
/**
* Retrieve all data from LevelDB and automatically persist all document updates to leveldb.
*
* @param {string} docName
* @param {Y.Y} ydocument
*/
bindState (docName, ydocument) {
const broadcastUpdate = (update, origin, y) => {
if (origin !== 'remote') {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, update)
const buf = encoding.toUint8Array(encoder)
if (y.wsconnected) {
// @ts-ignore We know that wsconnected = true
y.ws.send(buf)
}
}
}
// write all updates received from other clients
// - unless it is created by this persistence layer (e.g. loadFromPersistence, we we mux).
ydocument.on('update', (update, origin, y) => {
mux(() => {
const encoder = encoding.createEncoder()
syncProtocol.writeUpdate(encoder, update)
writeEntry(this.db, docName, encoding.toUint8Array(encoder))
})
})
// read all data from persistence
return loadFromPersistence(this.db, docName, ydocument).then(() => {
// write current state (just in case anything was added before state was bound)
if(this.conf.writeStateOnLoad) {
this.writeState(docName, ydocument)
}
})
}
/**
* Write current state to persistence layer. Deletes all entries that were made before.
* Call this method at any time - the recommended time to call this method is before the ydocument is destroyed.
*
* @param {string} docName
* @param {Y.Y} ydocument
*/
writeState (docName, ydocument) {
return persistState(this.db, docName, ydocument)
}
}
@canadaduane
Copy link

canadaduane commented Mar 27, 2020

For future learners: you can also use performance.now() instead of the custom getNextTimestamp function above. See yjs/yjs#170 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment