Skip to content

Instantly share code, notes, and snippets.

Last active December 3, 2020 08:58
Show Gist options
  • Save josephg/7a0bdaa105d6eb43fd7ac1229f8ffdc3 to your computer and use it in GitHub Desktop.
Save josephg/7a0bdaa105d6eb43fd7ac1229f8ffdc3 to your computer and use it in GitHub Desktop.
P2p database sketch
* This is a simple example program showing a working in-memory database storing
* full history and branches. In this demo the "database" is a Map from string
* keys to values.
* Everything is versioned - you can wind time forward and backwards, and mark a
* branch and teleport the database back to that point in time.
* Conflicts are handled similarly to Basho's riak. When concurrent writes
* happen, the database stores every concurrent version of the document. A
* subsequent read will fetch all values, and a subsequent write can supercede
* all existing values to "solve" the conflict.
* For the most part I'm using a hybrid model between lamport timestamp version
* vectors and using implied versions via transitive dependancies. The idea is
* to allow arbitrary peers to write data (not limited to a "server pool" of
* writers). But still avoid sibling explosions by rarely actually naming the
* fully expanded version.
* At the bottom of the file is a fuzz test which creates 3 database peers then
* generates random operations to apply to them. Then random pairs of peers sync
* changes with one another. The whole database must stay consistent. The sync
* algorithm currently isn't written to work over a network - a proper protocol
* should also manage round-trips and whatnot, and ideally not take size linear
* with the number of agent IDs.
* The fuzz test is slower than I'd like because of deep cloning in the test and
* excessive comparisons (which grow linearly over time). A real database node
* should be much quicker.
import assert from 'assert'
import { Console } from "console"
import seedrandom from 'seedrandom'
import Map2 from 'map2'
import PriorityQueue from 'priorityqueuejs'
// A simple version interface. Externally you'd want to use random uuids for
// agent identifiers.
export interface RawVersion {
agent: number,
seq: number
export const ROOT_VERSION: RawVersion = {
agent: 0, seq: 0
const console = new Console({
stdout: process.stdout,
stderr: process.stderr,
inspectOptions: {
depth: null
const CHECK = process.env['NOCHECK'] ? false : true
if (CHECK) console.log('running in checked mode')
// An operation affects multiple documents
export interface RawOperation {
version: RawVersion, // These could be RawVersions.
succeedsSeq: number | null,
parents: RawVersion[], // Only direct, non-transitive dependancies. May or may not include own agent's parent.
docOps: DocOperation[],
interface LocalOperation {
version: RawVersion,
// This is filled in when an operation is added to the store. It is a local
// order defined such that if a > b, a.localOrder > b.localOrder. If two
// operations are concurrent they could end up in any order.
order: number,
succeedsOrder: number, // Or -1.
parents: number[] // localOrder of parents. Semantics same as RawOperation.parents.
docOps: DocOperation[],
// For network sync. TODO: Remove this.
raw: RawOperation,
export interface DocOperation {
key: string,
// collection: string,
// TODO: Change parents to a local order too.
parents: RawVersion[], // Specific to the document. The named operations edit this doc.
newValue: any,
export type DocValue = { // Has multiple entries iff version in conflict.
version: RawVersion,
value: any
type OperationSet = Map2<number, number, number>
interface DBState {
data: Map<string, DocValue>
// Version information is all relative / related to the current branch.
version: Map<number, number> // Expanded for all agents.
// The number of times each (agent, seq) is referenced by an operations
refs: Map2<number, number, number>
// This contains every entry in version where foreignRefs is 0 (or equivalently undefined).
versionFrontier: Set<number> // set of agent ids
// This stuff is not compared when we compare databases. This is not relative
// to the current branch - a database will store all seen operations here.
operations: LocalOperation[], // order => operation.
versionToOrder: OperationSet, // agent,seq => order.
const assertDbEq = (a: DBState, b: DBState) => {
if (!CHECK) return
assert.deepStrictEqual(a.version, b.version)
assert.deepStrictEqual(a.refs, b.refs)
assert.deepStrictEqual(a.versionFrontier, b.versionFrontier)
// There are 4 cases:
// - A dominates B (return +ive)
// - B dominates A (return -ive)
// - A and B are equal (not checked here)
// - A and B are concurrent (return 0)
const versionToOrder = (allOps: OperationSet, agent: number, seq: number): number => (
agent === 0 && seq === 0 ? -1 : allOps.get(agent, seq)!
const orderToVersion = (db: DBState, order: number): RawVersion => (
order === -1 ? ROOT_VERSION : db.operations[order].version
// This is used to break ties between versions.
const vCmp = (a: RawVersion, b: RawVersion) => (
(a.agent - b.agent) || (a.seq - b.seq)
* This method calculates whether two versions are concurrent, if a dominates b
* or if b dominates a.
* Returns +ive if a dominates b, -ive if b dominates a, or 0 if either the
* operations are the same or the operations are concurrent. It is the
* responsibility of the caller to differentiate these cases. (Via vCmp(a,b) === 0).
const compareVersions = (db: DBState, a: RawVersion, b: RawVersion): number => {
if (a.agent === b.agent) return a.seq - b.seq
// This works is via a DFS from the operation with a higher localOrder looking
// for the localOrder of the smaller operation.
const aOrder = versionToOrder(db.versionToOrder, a.agent, a.seq)
const bOrder = versionToOrder(db.versionToOrder, b.agent, b.seq)
assert(aOrder !== bOrder) // Should have returned above in this case.
const [start, target] = aOrder > bOrder ? [aOrder, bOrder] : [bOrder, aOrder]
const visited = new Set<number>() // Set of localOrders.
let found = false
const visit = (order: number) => {
// iter2++
// console.log('visiting', agent, seq)
if (order === target) {
found = true
} else if (order < target) { return }
const op = db.operations[order]
assert(op != null)
if (visited.has(order)) return
assert(op) // If we hit the root operation, we should have already returned.
if (op.succeedsOrder > -1) visit(op.succeedsOrder)
for (const p of op.parents) {
if (found) return
// Its impossible for the operation with a smaller localOrder to dominate the
// op with a larger localOrder.
if (found) return aOrder - bOrder
else return 0
* This method takes in two versions (expressed as a fronteir in order numbers).
* And it returns the set of operations only appearing in the history of one
* version or the other.
* There are some helper methods below for getting the diff of two versions
* expressed using RawVersions (diffv) and methods for converting a
* db.versionFrontier into a compatible list of orders.
const diff = (db: DBState, a: number[], b: number[]) => {
// console.log('calculating difference between', a, b)
// console.log('=', => getLocalVersion(db, order)), => getLocalVersion(db, order)))
// There's a bunch of ways to implement this. I'm not sure this is the best.
// In essence, we track 2 data structures:
// 1. A priority queue of order numbers. When we pop, we get the highest order
// first.
// 2. A tag for each order in the priority queue naming the type. This is held
// separate from the queue so we can tell when the same order is added to
// the priority queue twice (and at insertion time). The tag marks the
// entry as only in A's history, only in B's history or in the history of
// both items.
// We expand the priority queue until the only entries left are in the shared
// history of both A and B.
const enum ItemType { Shared, A, B }
const itemType = new Map<number, ItemType>()
// Every order is in here at most once. Every entry in the queue is also in
// itemType.
const queue = new PriorityQueue<number>()
// Number of items in the queue in both transitive histories (state Shared).
let numShared = 0
const enq = (order: number, type: ItemType) => {
const currentType = itemType.get(order)
if (currentType == null) {
itemType.set(order, type)
// console.log('+++ ', order, type, getLocalVersion(db, order))
if (type === ItemType.Shared) numShared++
} else if (type !== currentType && currentType !== ItemType.Shared) {
// This is sneaky. If the two types are different they have to be {A,B},
// {A,Shared} or {B,Shared}. In any of those cases the final result is
// Shared. If the current type isn't shared, set it as such.
itemType.set(order, ItemType.Shared)
for (const order of a) enq(order, ItemType.A)
for (const order of b) enq(order, ItemType.B)
const aOnly: number[] = [], bOnly: number[] = []
// Loop until everything is shared.
while (queue.size() > numShared) {
const order = queue.deq()
const type = itemType.get(order)!
// It should be safe to remove the item from itemType here.
// console.log('--- ', order, 'of type', type, getLocalVersion(db, order), 'shared', numShared, 'num', queue.size())
assert(type != null)
if (type === ItemType.Shared) numShared--
else (type === ItemType.A ? aOnly : bOnly).push(order)
if (order < 0) continue // Bottom out at the root operation.
const op = db.operations[order]
// if (op.succeedsOrder >= 0) enq(op.succeedsOrder, type)
for (const p of op.parents) enq(p, type)
// console.log('diff', => getLocalVersion(db, order)), => getLocalVersion(db, order)))
return {aOnly, bOnly}
const diffV = (db: DBState, a: RawVersion[], b: RawVersion[]) => {
const aOrder = => versionToOrder(db.versionToOrder, v.agent, v.seq))
const bOrder = => versionToOrder(db.versionToOrder, v.agent, v.seq))
return diff(db, aOrder, bOrder)
const vEq = (a: RawVersion, b: RawVersion) => a.agent === b.agent && a.seq === b.seq
const getVal = (db: DBState, key: string): DocValue => ( ?? [{
version: ROOT_VERSION,
value: null
* Apply an operation to the database, and move the current branch forward based
* on the operation's version.
* Note: It would be cleaner to separate this into two methods, one to ingest
* the operation and one to merge the operation's contents into the local
* database. (Ie, this does the equivalent of `git fetch` then `git merge`.
* These functions should be separated.)
const applyForwards = (db: DBState, op: RawOperation) => {
// The operation supercedes all the versions named in parents with version.
let oldV = db.version.get(op.version.agent)
assert(oldV == null || op.version.seq > oldV, "db already contains inserted operation")
for (const {agent, seq} of op.parents) {
const dbSeq = db.version.get(agent)!
assert(dbSeq != null && dbSeq >= seq, "Cannot apply operation from the future")
const oldRefs = db.refs.get(agent, seq) ?? 0
db.refs.set(agent, seq, oldRefs + 1)
if (dbSeq === seq && oldRefs === 0) db.versionFrontier.delete(agent)
db.version.set(op.version.agent, op.version.seq)
// *** The version metadata is now up to date. Add the raw operation to the
// database
let order = db.versionToOrder.get(op.version.agent, op.version.seq)
// We check becuase the operation might already be known in the database.
if (order == null) {
order = db.operations.length
db.versionToOrder.set(op.version.agent, op.version.seq, order)
const localOp: LocalOperation = {
version: op.version,
parents: => db.versionToOrder.get(v.agent, v.seq) ?? -1),
succeedsOrder: op.succeedsSeq == null ? -1
: db.versionToOrder.get(op.version.agent, op.succeedsSeq)!,
docOps: op.docOps,
raw: op,
db.operations[order] = localOp // Appends if necessary.
// *** And apply the named changes to the data model in
for (const {key, parents, newValue} of op.docOps) {
const prevVals = getVal(db, key)
// The doc op's parents field contains a subset of the versions present in
// oldVal.
// - Any entries in prevVals that aren't named in the new operation are kept
// - And any entries in parents that aren't directly named in prevVals must
// be ancestors of the current document value. This indicates a conflict,
// and we'll keep everything.
// We'll check ancestry. Every parent of this operation (parents) must
// either be represented directly in prevVals or be dominated of one of
// them.
for (const v of parents) {
const exists = !!prevVals.find(({version: v2}) => vEq(v, v2))
if (!exists) {
// console.log('Checking ancestry')
// We expect one of v2 to dominate v.
const ancestry ={version: v2}) => compareVersions(db, v2, v))
assert(ancestry.findIndex(a => a > 0) >= 0)
const newVal = [{version: op.version, value: newValue}]
for (const oldEntry of prevVals) {
if (parents.find(v => vEq(v, oldEntry.version)) == null) {
// Keep this.
// If there's multiple conflicting values, we keep them in version-sorted
// order to make db comparisons easier in the fuzzer., newVal.sort((a, b) => vCmp(a.version, b.version)))
* This is the inverse of applyForwards above. It purges the operation from the
* data model and the version, but it does not prune the operation from the
* known set of operations.
const applyBackwards = (db: DBState, op: RawOperation) => {
// Version stuff
assert(db.version.get(op.version.agent) === op.version.seq)
assert(db.versionFrontier.has(op.version.agent), "Cannot unapply operations in violation of partial order")
if (op.succeedsSeq != null) {
db.version.set(op.version.agent, op.succeedsSeq)
if ((db.refs.get(op.version.agent, op.succeedsSeq) ?? 0) !== 0) {
} else {
for (const {agent, seq} of op.parents) {
const dbSeq = db.version.get(agent)
// console.log('dbseq', dbSeq, 'seq', seq)
assert(dbSeq == null || dbSeq >= seq, "internal consistency error in versions")
const newRefs = db.refs.get(agent, seq)! - 1
assert(newRefs >= 0)
if (newRefs === 0) db.refs.delete(agent, seq) // Makes equals simpler.
else db.refs.set(agent, seq, newRefs)
if (newRefs === 0 && dbSeq === seq) db.versionFrontier.add(agent)
// Ok now update the data.
for (const {key, parents} of op.docOps) {
const prevVals =!
// The values should instead contain:
// - Everything in prevVals not including op.version
// - All the objects named in parents that aren't superceded by another
// document version
const newVals = prevVals.filter(({version}) => !vEq(version, op.version))
// console.log('prevVals', prevVals)
// And append all the parents that aren't dominated by another value.
for (const p of parents) {
// TODO: Assert p not already represented in prevVals, which would be invalid.
// If p is dominated by a value in prevVals, skip.
const dominated ={version}) => compareVersions(db, p, version))
if (dominated.findIndex(x => x < 0) >= 0) continue
if (vEq(p, ROOT_VERSION)) {
assert(newVals.length === 0)
} else {
const parentOp = db.operations[db.versionToOrder.get(p.agent, p.seq)!]
const parentDocOp = parentOp.docOps.find(({key: key2}) => key === key2)!
newVals.push({version: p, value: parentDocOp.newValue})
// console.log('newVals', newVals)
// This is a bit dirty, to handle the root.
if (newVals.length === 0)
else, newVals)
// Not done: Remove the operation itself from the db.operations set.
// Verify the database is internally consistent.
const checkDb = (db: DBState) => {
if (!CHECK) return
// The frontier entries should always be at the current version
// console.log('checkdb', db)
for (const [agent, seq] of db.version) {
const isFrontier = db.versionFrontier.has(agent)
if (isFrontier) {
assert.strictEqual(db.refs.get(agent, seq), undefined)
assert(db.refs.get(agent, seq) !== 0)
} else {
assert.notStrictEqual(db.refs.get(agent, seq) ?? 0, 0, "missing frontier element: " + agent)
// Scan all the operations. For each operation with parents, those parents
// should have a smaller localOrder field.
for (const [order, op] of db.operations.entries()) {
assert.strictEqual(op.order, order)
const {agent, seq} = op.version
assert.strictEqual(db.versionToOrder.get(agent, seq), order)
// Check the direct parent
if (op.succeedsOrder > -1) {
assert(op.succeedsOrder < order)
// Direct parent has a matching agent id.
assert.strictEqual(db.operations[op.succeedsOrder].version.agent, agent)
for (const o of op.parents) {
assert(o < order)
// if (o > -1) assert.notStrictEqual(db.operations[o].version.agent, agent)
const randomReal = seedrandom("hi")
const randomInt = (max: number) => (randomReal.int32() + 0xffffffff) % max
const randItem = <T>(arr: T[]): T => arr[randomInt(arr.length)]
const randomOp = (db: DBState, ownedAgents: number[], iter: number): RawOperation => {
const agent = ownedAgents[randomInt(ownedAgents.length)]
const oldVersion = db.version.get(agent)
// We'll need to use the next sequence number from the current state
const seq = oldVersion == null ? 1 : oldVersion + 1
// The parents of the operation are the current frontier set.
// It'd be good to occasionally make a super old operation and throw that in too though.
const parents: RawVersion[] = Array.from(db.versionFrontier)
.map(agent => ({agent, seq: db.version.get(agent)!}))
// The set of keys should always grow, but we'll make it grow slower as the run goes on.
const key = 'KEY_' + randomInt(Math.sqrt(iter)|0)
// const key = randItem(['a', 'b', 'c', 'd', 'e', 'f'])
const docParents = getVal(db, key).map(({version}) => version)
return {
version: {agent, seq},
succeedsSeq: oldVersion ?? null,
docOps: [{
key: key,
newValue: randomInt(100),
parents: docParents
const getOrderVersion = (db: DBState): number[] => (
Array.from(db.versionFrontier).map(agent => (
agent === 0 ? -1 : db.versionToOrder.get(agent, db.version.get(agent)!)!
const cmp = (a: number, b: number) => a - b
const getBranch = (db: DBState): RawVersion[] => (
Array.from(db.versionFrontier).map(agent => (
{agent, seq: db.version.get(agent)!}
// Branch specified as a frontier map - sparce agent => seq.
// Returns original branch.
const switchBranch = (db: DBState, branch: RawVersion[]): RawVersion[] => {
const start = getBranch(db)
const end = branch
const {aOnly, bOnly} = diffV(db, start, end)
for (const order of aOnly) {
applyBackwards(db, db.operations[order].raw)
for (const order of bOnly.reverse()) {
applyForwards(db, db.operations[order].raw)
return start
* Sync a and b. Eventually this should not depend on the db.version field. It
* should instead generate a bloom filter or something of known versions and
* then use a proper network protocol to sync. But for now this is fine.
const syncPeers = (a: DBState, b: DBState) => {
const onlyA = [], onlyB = []
const agents = new Set([...a.version.keys(), ...b.version.keys()])
const getOperationsFrom = (db: DBState, agent: number, seqStart: number, seqEnd: number | null) => {
const resultOrders: number[] = []
let order = versionToOrder(db.versionToOrder, agent, seqStart)
let orderEnd = seqEnd == null ? -1 : versionToOrder(db.versionToOrder, agent, seqEnd)
while (order > orderEnd) {
const op = db.operations[order]
assert(op != null)
order = op.succeedsOrder
return resultOrders
for (let agent of agents) {
const aSeq = a.version.get(agent)
const bSeq = b.version.get(agent)
if (aSeq == bSeq) continue
assert(aSeq != null || bSeq != null) // Either they're both set or one is null.
const aBigger = bSeq == null || (aSeq! > bSeq)
if (aBigger) {
onlyA.push(...getOperationsFrom(a, agent, aSeq!, bSeq ?? null))
} else {
onlyB.push(...getOperationsFrom(b, agent, bSeq!, aSeq ?? null))
const onlyAOps = onlyA.sort(cmp).map(order => a.operations[order].raw)
const onlyBOps = onlyB.sort(cmp).map(order => b.operations[order].raw)
// console.log('syncing peers', aId, bId, 'distance:', onlyAOps.length, onlyBOps.length)
for (const op of onlyBOps) {
applyForwards(a, op)
for (const op of onlyAOps) {
applyForwards(b, op)
const test = () => {
// const numPeers = 1
const numPeers = 3
const numOpsPerIter = 10
// const numOpsPerIter = 10
const numIterations = 2000
// const numIterations = 300
const peers = new Array(numPeers).fill(null).map(() => ({
db: <DBState>{
data: new Map<string, DocValue>(),
version: new Map<number, number>([[ROOT_VERSION.agent, ROOT_VERSION.seq]]), // Expanded for all agents
refs: new Map2(),
versionFrontier: new Set([ROOT_VERSION.agent]), // Kept in sorted order based on comparator.
versionToOrder: new Map2(),
operations: []
iterStartV: new Map<number, number>(),
iterStartData: new Map<string, DocValue>(),
iterOps: <RawOperation[]>[] // Operations this iteration
let nextAgent = 100
for (let iter = 0; iter < numIterations; iter++) {
if ((iter % 100) == 0) console.log('***** ITER', iter)
for (const peer of peers) {
peer.iterStartV = new Map(peer.db.version)
peer.iterStartData = new Map(
peer.iterOps.length = 0
// const startVersion = new Map(peers[0].db.version)
// const startState = new Map(peers[0]
// They should all start at the same version.
// for (const {db} of peers.slice(1)) assert.deepStrictEqual(db.version, startVersion)
// Generate numOpsPerIter assigned to random peers in our pool
for (let i = 0; i < numOpsPerIter; i++) {
const peerId = randomInt(peers.length)
const peer = peers[peerId]
const op = randomOp(peer.db, [peerId + 1, nextAgent++], iter)
// console.log(op)
applyForwards(peer.db, op)
for (const {db} of peers) checkDb(db)
// For each peer, rewind the newly created operations and reapply them.
for (const peer of peers) {
const finalVersion = new Map(peer.db.version)
const finalFrontierOrder = getOrderVersion(peer.db)
peer.iterOps.slice().reverse().forEach(op => {
applyBackwards(peer.db, op)
assert.deepStrictEqual(peer.db.version, peer.iterStartV)
assert.deepStrictEqual(, peer.iterStartData)
const initialFrontierOrder = getOrderVersion(peer.db)
peer.iterOps.forEach(op => {
applyForwards(peer.db, op)
assert.deepStrictEqual(peer.db.version, finalVersion)
// The difference between startVersion and finalVersion should just be the
// operations.
const {aOnly, bOnly} = diff(peer.db, initialFrontierOrder, finalFrontierOrder)
assert.strictEqual(aOnly.length, 0)
assert.strictEqual(bOnly.length, peer.iterOps.length)
const bVersions = => peer.db.operations[order].version).reverse()
const peerVersions = => op.version)
assert.deepStrictEqual(bVersions, peerVersions)
// We'll pick a random pair of peers and synchronize them.
if (peers.length >= 2) {
const aId = randomInt(peers.length)
let bId = randomInt(peers.length - 1)
if (bId >= aId) ++bId
const [a, b] = [peers[aId].db, peers[bId].db]
const [aHead, bHead] = [getBranch(a), getBranch(b)]
// Actually sync, bringing both a and b into alignment.
syncPeers(a, b)
assert.deepStrictEqual(a.version, b.version)
assert.deepStrictEqual(a.versionFrontier, b.versionFrontier)
const mergeHead = getBranch(a)
// console.log('merge', mergeHead, aHead, bHead)
// So we should be able to bounce between branches now.
switchBranch(a, aHead)
switchBranch(b, aHead)
assertDbEq(a, b)
switchBranch(a, bHead)
switchBranch(b, bHead)
assertDbEq(a, b)
switchBranch(a, mergeHead)
switchBranch(b, mergeHead)
assertDbEq(a, b)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment