Skip to content

Instantly share code, notes, and snippets.

@janl
Last active July 12, 2023 10:02
Show Gist options
  • Save janl/d0ba3544631e054e59fbd5f4d3ee96f8 to your computer and use it in GitHub Desktop.
Save janl/d0ba3544631e054e59fbd5f4d3ee96f8 to your computer and use it in GitHub Desktop.
/* Chain replication demo for CouchDB, h/t @tef & @sushee
background: https://timilearning.com/posts/mit-6.824/lecture-9-craq/
Example:
```
curl 127.0.0.1:3000 -X POST --json '{"a":5}'
{
"ok": true,
"id": "0c0661ae9bd09c97957cd04579034d7b",
"rev": "1-8bf226e57847cf49b0d45065383cf323"}
}
curl 127.0.0.1:3000/0c0661ae9bd09c97957cd04579034d7b
{
"_id": "0c0661ae9bd09c97957cd04579034d7b",
"_rev": "1-8bf226e57847cf49b0d45065383cf323",
"a": 5
}
```
*/
// first, we need a HTTP server
const fastify = require('fastify')({
logger: true
})
// some private lib wrapper around the CouchDB HTTP API
const { CouchServer } = require('couch-utils')
const COUCHDB = 'http://admin:admin@127.0.0.1:5984'
let couchServer = null
// define three nodes, for our experiment, these are numbered
// databases on the same CouchDB instance, but they could live
// on their own instance, we just need to adjust the URLs a bit
const nodeSpec = [
`t0`,
`t1`,
`t2`
]
// some global state var(t)s because this is a sloppy demo
let nodes = []
let waiter = null
let store = 0
const seqStore = {
load() {
return store
},
save(seq) {
store = seq
}
}
// set up our CouchDB databases and chain replication
const setup = async () => {
// connect to our one CouchDB server
couchServer = await CouchServer.connect(COUCHDB)
// for each entry in nodeSpec, get a database handle and set up
// one-way replication from db n to n+1
nodes = await Promise.all(nodeSpec.map(async (node, idx, allNodes) => {
// get db handle
const db = await couchServer.db(node)
// create is idempotent, does not fail if db already exists
await db.create()
if (idx < allNodes.length - 1) {
// set up continuous forward replication
await couchServer.replication({
source: `${COUCHDB}/${nodeSpec[idx]}`,
target: `${COUCHDB}/${nodeSpec[idx + 1]}`,
continuous: true
}).run()
}
return db
}))
}
// handle reads
fastify.get('/:id', async (request, reply) => {
// pick the last node to read from
const readFrom = nodes[nodes.length - 1]
// read object with :id from CouchDB
const response = await readFrom.load(request.params.id)
return response
})
// handle writes
fastify.post('/', async (request, reply) => {
// pick first node to write
const writeTo = nodes[0]
// pick last node to make sure the write made it
const readFrom = nodes[nodes.length - 1]
// write the object
const response = await writeTo.save(request.body)
// this returns an object of the type { ok: true, id: {uuid}, rev: {hash} }
// no wait for that id/rev combo to arrive on the last node
// the CouchDB _changes API is an indexed list of all entries
// in a database, here we open an HTTP socket and wait until
// our write appears on the last node
const wait = new Promise((reject, resolve) => {
const waiter = readFrom.changesListener({
// optimisation to avoid looking through all
// changes on the last node on each request
store: seqStore,
handler: (change) => {
// each object stored in CouchDB gets assigned a
// uuid and a content addressable hash called rev
if (change.id === response.json.id && change.changes[0].rev === response.json.rev) {
// we have a match! the last node has our write from the first node
resolve(response)
}
// TODO: handle timeout, fallback to node last-1, -2 and so on
}
}).stream()
})
return wait
})
// Run the server
const start = async () => {
try {
await setup()
await fastify.listen({ port: 3000 })
} catch (err) {
fastify.log.error(err)
process.exit(1)
}
}
start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment