Last active
July 12, 2023 10:02
-
-
Save janl/d0ba3544631e054e59fbd5f4d3ee96f8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Chain replication demo for CouchDB, h/t @tef & @sushee | |
background: https://timilearning.com/posts/mit-6.824/lecture-9-craq/ | |
Example: | |
``` | |
curl 127.0.0.1:3000 -X POST --json '{"a":5}' | |
{ | |
"ok": true, | |
"id": "0c0661ae9bd09c97957cd04579034d7b", | |
"rev": "1-8bf226e57847cf49b0d45065383cf323"} | |
} | |
curl 127.0.0.1:3000/0c0661ae9bd09c97957cd04579034d7b | |
{ | |
"_id": "0c0661ae9bd09c97957cd04579034d7b", | |
"_rev": "1-8bf226e57847cf49b0d45065383cf323", | |
"a": 5 | |
} | |
``` | |
*/ | |
// first, we need a HTTP server | |
const fastify = require('fastify')({ | |
logger: true | |
}) | |
// some private lib wrapper around the CouchDB HTTP API | |
const { CouchServer } = require('couch-utils') | |
const COUCHDB = 'http://admin:admin@127.0.0.1:5984' | |
let couchServer = null | |
// define three nodes, for our experiment, these are numbered | |
// databases on the same CouchDB instance, but they could live | |
// on their own instance, we just need to adjust the URLs a bit | |
const nodeSpec = [ | |
`t0`, | |
`t1`, | |
`t2` | |
] | |
// some global state var(t)s because this is a sloppy demo | |
let nodes = [] | |
let waiter = null | |
let store = 0 | |
const seqStore = { | |
load() { | |
return store | |
}, | |
save(seq) { | |
store = seq | |
} | |
} | |
// set up our CouchDB databases and chain replication | |
const setup = async () => { | |
// connect to our one CouchDB server | |
couchServer = await CouchServer.connect(COUCHDB) | |
// for each entry in nodeSpec, get a database handle and set up | |
// one-way replication from db n to n+1 | |
nodes = await Promise.all(nodeSpec.map(async (node, idx, allNodes) => { | |
// get db handle | |
const db = await couchServer.db(node) | |
// create is idempotent, does not fail if db already exists | |
await db.create() | |
if (idx < allNodes.length - 1) { | |
// set up continuous forward replication | |
await couchServer.replication({ | |
source: `${COUCHDB}/${nodeSpec[idx]}`, | |
target: `${COUCHDB}/${nodeSpec[idx + 1]}`, | |
continuous: true | |
}).run() | |
} | |
return db | |
})) | |
} | |
// handle reads | |
fastify.get('/:id', async (request, reply) => { | |
// pick the last node to read from | |
const readFrom = nodes[nodes.length - 1] | |
// read object with :id from CouchDB | |
const response = await readFrom.load(request.params.id) | |
return response | |
}) | |
// handle writes | |
fastify.post('/', async (request, reply) => { | |
// pick first node to write | |
const writeTo = nodes[0] | |
// pick last node to make sure the write made it | |
const readFrom = nodes[nodes.length - 1] | |
// write the object | |
const response = await writeTo.save(request.body) | |
// this returns an object of the type { ok: true, id: {uuid}, rev: {hash} } | |
// no wait for that id/rev combo to arrive on the last node | |
// the CouchDB _changes API is an indexed list of all entries | |
// in a database, here we open an HTTP socket and wait until | |
// our write appears on the last node | |
const wait = new Promise((reject, resolve) => { | |
const waiter = readFrom.changesListener({ | |
// optimisation to avoid looking through all | |
// changes on the last node on each request | |
store: seqStore, | |
handler: (change) => { | |
// each object stored in CouchDB gets assigned a | |
// uuid and a content addressable hash called rev | |
if (change.id === response.json.id && change.changes[0].rev === response.json.rev) { | |
// we have a match! the last node has our write from the first node | |
resolve(response) | |
} | |
// TODO: handle timeout, fallback to node last-1, -2 and so on | |
} | |
}).stream() | |
}) | |
return wait | |
}) | |
// Run the server | |
const start = async () => { | |
try { | |
await setup() | |
await fastify.listen({ port: 3000 }) | |
} catch (err) { | |
fastify.log.error(err) | |
process.exit(1) | |
} | |
} | |
start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment