Skip to content

Instantly share code, notes, and snippets.

@Raynos
Created August 22, 2013 00:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Raynos/6302093 to your computer and use it in GitHub Desktop.
Save Raynos/6302093 to your computer and use it in GitHub Desktop.
type EventedRepository<T> := {
store: (Array<T>, Callback<Array<T>>),
update: (id: String, delta: Object, Callback<T>),
remove: (id: String, Callback<void>),
drop: (Callback<void>),
sub: (namespace: String, opts: Object) => EventedRepository<X>,
getById: (id: String, Callback<T>),
getAll: (Callback<Array<T>>),
getFor: (key: String, value: Any, Callback<Array<T>>)
}
evented-repository := (db: Level | MongoDB, opts: {
namespace: String,
missingCallback: (err) => void,
encode: (item: T) => dataItem: S,
decode: (dataItem: S) => item: T
primaryKey: "id"
}) => EventedRepository<T>
var uuid = require("uuid")
var extend = require("xtend")
var SubLevel = require("level-sublevel")
module.exports = EventedRepository
function EventedRepository(db, opts) {
db = SubLevel(db).sublevel(opts.namespace)
var eventsDb = db.sublevel("events")
opts = opts || {}
var encoder = opts.encoder || identity
var decoder = opts.decoder || identity
var missingCallback = opts.missingCallback || noop
var primaryKey = opts.primaryKey || "id"
return {
store: store,
update: update,
remove: remove,
getAll: getAll,
getFor: getFor,
getById: getByPrimaryKey,
drop: drop,
sub: sub
}
function store(records, callback) {
records = records.map(encoder)
callback = callback || missingCallback
records.forEach(function (record) {
if (!record[primaryKey]) {
record[primaryKey] = uuid()
}
})
eventsDb.batch(records.map(asEvent), function (err) {
if (err) {
return callback(err)
}
db.batch(records.map(function (record) {
return { type: "put", value: record, key: record[primaryKey] }
}), function (err, records) {
if (err) {
return callback(err)
}
callback(null, decoder(records[0]))
})
})
function asEvent(record) {
return {
key: record[primaryKey] + "~" + uuid(),
type: "put",
value: {
name: "record created",
record: record,
id: record[primaryKey],
time: Date.now()
}
}
}
}
function remove(id, callback) {
callback = callback || missingCallback
eventsDb.put(id + "~" + uuid(), {
name: "record removed",
id: id,
time: Date.now()
}, function (err) {
if (err) {
return callback(err)
}
db.del(id, callback)
})
}
function update(id, delta, callback) {
callback = callback || missingCallback
eventsDb.put(id + "~" + uuid(), {
name: "record updated",
id: id,
delta: delta,
time: Date.now()
}, function (err) {
if (err) {
return callback(err)
}
db.get(id, function (err, doc) {
if (err) {
return callback(err)
}
var newDoc = extend(doc, delta)
db.put(id, newDoc, function (err) {
if (err) {
return callback(err)
}
callback(null, decoder(newDoc))
})
})
})
}
function getByPrimaryKey(key, callback) {
db.get(key, function (err, record) {
if (err) {
return callback(err)
}
callback(null, decoder(record))
})
}
function getAll(callback) {
var results = []
var stream = db.createReadStream()
stream.on("data", function (chunk) {
results.push(decoder(chunk.value))
})
stream.once("end", function () {
callback(null, results)
})
stream.once("error", callback)
}
function getFor(key, value, callback) {
getAll(function (err, results) {
if (err) {
return callback(err)
}
callback(null, results.filter(function (record) {
return record[key] === value
}))
})
}
function drop(callback) {
var count = 2
var done = false
purgeAllValues(db, function (err) {
if (err && !done) {
done = true
return callback(err)
}
if (--count === 0) {
callback(null)
}
})
purgeAllValues(eventsDb, function (err) {
if (err && !done) {
done = true
return callback(err)
}
if (--count === 0) {
callback(null)
}
})
}
function purgeAllValues(db, callback) {
var stream = db.keyStream()
var remaining = 0
var ended = false
var done = false
stream.on("data", function (key) {
remaining++
db.del(key, function (err) {
if (err && !done) {
done = true
stream.destroy()
return callback(err)
}
remaining--
if (ended) {
checkEnd()
}
})
})
stream.once("error", function (err) {
if (!done) {
done = true
callback(err)
}
})
stream.on("end", function () {
ended = true
checkEnd()
})
function checkEnd() {
if (remaining === 0 && !done) {
done = true
callback()
}
}
}
function sub(options) {
return EventedRepository(db, options)
}
}
function identity(x) { return x }
function noop() {}
var uuid = require("uuid")
module.exports = EventedRepository
function EventedRepository(db, opts) {
var collection = db.collection(opts.namespace)
var eventCollection = db.collection(opts.namespace + "~events")
opts = opts || {}
var encoder = opts.encoder || identity
var decoder = opts.decoder || identity
var missingCallback = opts.missingCallback || noop
var primaryKey = opts.primaryKey || "id"
return {
store: store,
update: update,
remove: remove,
getAll: getAll,
getFor: getFor,
getById: getByPrimaryKey,
drop: drop,
sub: sub
}
function store(records, callback) {
records = records.map(encoder)
callback = callback || missingCallback
records.forEach(function (record) {
if (!record[primaryKey]) {
record[primaryKey] = uuid()
}
})
eventCollection.insert(records.map(asEvent), function (err) {
if (err) {
return callback(err)
}
collection.insert(records, function (err, records) {
if (err) {
return callback(err)
}
callback(null, decoder(records[0]))
})
})
function asEvent(record) {
return {
name: "record created",
record: record,
id: record[primaryKey],
time: Date.now()
}
}
}
function remove(id, callback) {
callback = callback || missingCallback
eventCollection.insert([{
name: "record removed",
id: id,
time: Date.now()
}], function (err) {
if (err) {
return callback(err)
}
var query = {}
query[primaryKey] = id
collection.remove(query, function (err, record) {
if (err) {
return callback(err)
}
callback(null, null)
})
})
}
function update(id, delta, callback) {
callback = callback || missingCallback
eventCollection.insert([{
name: "record updated",
id: id,
delta: delta,
time: Date.now()
}], function (err) {
if (err) {
return callback(err)
}
var query = {}
query[primaryKey] = id
collection.findAndModify(query, [[primaryKey, -1]], {
$set: delta
}, {
"new": true
}, function (err, record) {
if (err) {
return callback(err)
}
callback(null, decoder(record))
})
})
}
function getByPrimaryKey(key, callback) {
var query = {}
query[primaryKey] = key
collection.findOne(query, function (err, record) {
if (err) {
return callback(err)
}
callback(null, decoder(record))
})
}
function getAll(callback) {
collection.find().toArray(function (err, records) {
if (err) {
return callback(err)
}
callback(null, records.map(decoder))
})
}
function getFor(key, value, callback) {
var query = {}
query[key] = value
collection.find(query).toArray(function (err, records) {
if (err) {
return callback(err)
}
callback(null, records.map(decoder))
})
}
function drop(callback) {
var count = 2
var done = false
collection.drop(function (err) {
if (err && !done) {
done = true
return callback(err)
}
if (--count === 0) {
callback(null)
}
})
eventCollection.drop(function (err) {
if (err && !done) {
done = true
return callback(err)
}
if (--count === 0) {
callback(null)
}
})
}
function sub(options) {
options.namespace = opts.namespace + "." + options.namespace
return EventedRepository(db, options)
}
}
function identity(x) { return x }
function noop() {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment