Created
August 22, 2013 00:59
-
-
Save Raynos/6302093 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type EventedRepository<T> := { | |
store: (Array<T>, Callback<Array<T>>), | |
update: (id: String, delta: Object, Callback<T>), | |
remove: (id: String, Callback<void>), | |
drop: (Callback<void>), | |
sub: (namespace: String, opts: Object) => EventedRepository<X>, | |
getById: (id: String, Callback<T>), | |
getAll: (Callback<Array<T>>), | |
getFor: (key: String, value: Any, Callback<Array<T>>) | |
} | |
evented-repository := (db: Level | MongoDB, opts: { | |
namespace: String, | |
missingCallback: (err) => void, | |
encode: (item: T) => dataItem: S, | |
decode: (dataItem: S) => item: T | |
primaryKey: "id" | |
}) => EventedRepository<T> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var uuid = require("uuid") | |
var extend = require("xtend") | |
var SubLevel = require("level-sublevel") | |
module.exports = EventedRepository | |
function EventedRepository(db, opts) { | |
db = SubLevel(db).sublevel(opts.namespace) | |
var eventsDb = db.sublevel("events") | |
opts = opts || {} | |
var encoder = opts.encoder || identity | |
var decoder = opts.decoder || identity | |
var missingCallback = opts.missingCallback || noop | |
var primaryKey = opts.primaryKey || "id" | |
return { | |
store: store, | |
update: update, | |
remove: remove, | |
getAll: getAll, | |
getFor: getFor, | |
getById: getByPrimaryKey, | |
drop: drop, | |
sub: sub | |
} | |
function store(records, callback) { | |
records = records.map(encoder) | |
callback = callback || missingCallback | |
records.forEach(function (record) { | |
if (!record[primaryKey]) { | |
record[primaryKey] = uuid() | |
} | |
}) | |
eventsDb.batch(records.map(asEvent), function (err) { | |
if (err) { | |
return callback(err) | |
} | |
db.batch(records.map(function (record) { | |
return { type: "put", value: record, key: record[primaryKey] } | |
}), function (err, records) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, decoder(records[0])) | |
}) | |
}) | |
function asEvent(record) { | |
return { | |
key: record[primaryKey] + "~" + uuid(), | |
type: "put", | |
value: { | |
name: "record created", | |
record: record, | |
id: record[primaryKey], | |
time: Date.now() | |
} | |
} | |
} | |
} | |
function remove(id, callback) { | |
callback = callback || missingCallback | |
eventsDb.put(id + "~" + uuid(), { | |
name: "record removed", | |
id: id, | |
time: Date.now() | |
}, function (err) { | |
if (err) { | |
return callback(err) | |
} | |
db.del(id, callback) | |
}) | |
} | |
function update(id, delta, callback) { | |
callback = callback || missingCallback | |
eventsDb.put(id + "~" + uuid(), { | |
name: "record updated", | |
id: id, | |
delta: delta, | |
time: Date.now() | |
}, function (err) { | |
if (err) { | |
return callback(err) | |
} | |
db.get(id, function (err, doc) { | |
if (err) { | |
return callback(err) | |
} | |
var newDoc = extend(doc, delta) | |
db.put(id, newDoc, function (err) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, decoder(newDoc)) | |
}) | |
}) | |
}) | |
} | |
function getByPrimaryKey(key, callback) { | |
db.get(key, function (err, record) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, decoder(record)) | |
}) | |
} | |
function getAll(callback) { | |
var results = [] | |
var stream = db.createReadStream() | |
stream.on("data", function (chunk) { | |
results.push(decoder(chunk.value)) | |
}) | |
stream.once("end", function () { | |
callback(null, results) | |
}) | |
stream.once("error", callback) | |
} | |
function getFor(key, value, callback) { | |
getAll(function (err, results) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, results.filter(function (record) { | |
return record[key] === value | |
})) | |
}) | |
} | |
function drop(callback) { | |
var count = 2 | |
var done = false | |
purgeAllValues(db, function (err) { | |
if (err && !done) { | |
done = true | |
return callback(err) | |
} | |
if (--count === 0) { | |
callback(null) | |
} | |
}) | |
purgeAllValues(eventsDb, function (err) { | |
if (err && !done) { | |
done = true | |
return callback(err) | |
} | |
if (--count === 0) { | |
callback(null) | |
} | |
}) | |
} | |
function purgeAllValues(db, callback) { | |
var stream = db.keyStream() | |
var remaining = 0 | |
var ended = false | |
var done = false | |
stream.on("data", function (key) { | |
remaining++ | |
db.del(key, function (err) { | |
if (err && !done) { | |
done = true | |
stream.destroy() | |
return callback(err) | |
} | |
remaining-- | |
if (ended) { | |
checkEnd() | |
} | |
}) | |
}) | |
stream.once("error", function (err) { | |
if (!done) { | |
done = true | |
callback(err) | |
} | |
}) | |
stream.on("end", function () { | |
ended = true | |
checkEnd() | |
}) | |
function checkEnd() { | |
if (remaining === 0 && !done) { | |
done = true | |
callback() | |
} | |
} | |
} | |
function sub(options) { | |
return EventedRepository(db, options) | |
} | |
} | |
function identity(x) { return x } | |
function noop() {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var uuid = require("uuid") | |
module.exports = EventedRepository | |
function EventedRepository(db, opts) { | |
var collection = db.collection(opts.namespace) | |
var eventCollection = db.collection(opts.namespace + "~events") | |
opts = opts || {} | |
var encoder = opts.encoder || identity | |
var decoder = opts.decoder || identity | |
var missingCallback = opts.missingCallback || noop | |
var primaryKey = opts.primaryKey || "id" | |
return { | |
store: store, | |
update: update, | |
remove: remove, | |
getAll: getAll, | |
getFor: getFor, | |
getById: getByPrimaryKey, | |
drop: drop, | |
sub: sub | |
} | |
function store(records, callback) { | |
records = records.map(encoder) | |
callback = callback || missingCallback | |
records.forEach(function (record) { | |
if (!record[primaryKey]) { | |
record[primaryKey] = uuid() | |
} | |
}) | |
eventCollection.insert(records.map(asEvent), function (err) { | |
if (err) { | |
return callback(err) | |
} | |
collection.insert(records, function (err, records) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, decoder(records[0])) | |
}) | |
}) | |
function asEvent(record) { | |
return { | |
name: "record created", | |
record: record, | |
id: record[primaryKey], | |
time: Date.now() | |
} | |
} | |
} | |
function remove(id, callback) { | |
callback = callback || missingCallback | |
eventCollection.insert([{ | |
name: "record removed", | |
id: id, | |
time: Date.now() | |
}], function (err) { | |
if (err) { | |
return callback(err) | |
} | |
var query = {} | |
query[primaryKey] = id | |
collection.remove(query, function (err, record) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, null) | |
}) | |
}) | |
} | |
function update(id, delta, callback) { | |
callback = callback || missingCallback | |
eventCollection.insert([{ | |
name: "record updated", | |
id: id, | |
delta: delta, | |
time: Date.now() | |
}], function (err) { | |
if (err) { | |
return callback(err) | |
} | |
var query = {} | |
query[primaryKey] = id | |
collection.findAndModify(query, [[primaryKey, -1]], { | |
$set: delta | |
}, { | |
"new": true | |
}, function (err, record) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, decoder(record)) | |
}) | |
}) | |
} | |
function getByPrimaryKey(key, callback) { | |
var query = {} | |
query[primaryKey] = key | |
collection.findOne(query, function (err, record) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, decoder(record)) | |
}) | |
} | |
function getAll(callback) { | |
collection.find().toArray(function (err, records) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, records.map(decoder)) | |
}) | |
} | |
function getFor(key, value, callback) { | |
var query = {} | |
query[key] = value | |
collection.find(query).toArray(function (err, records) { | |
if (err) { | |
return callback(err) | |
} | |
callback(null, records.map(decoder)) | |
}) | |
} | |
function drop(callback) { | |
var count = 2 | |
var done = false | |
collection.drop(function (err) { | |
if (err && !done) { | |
done = true | |
return callback(err) | |
} | |
if (--count === 0) { | |
callback(null) | |
} | |
}) | |
eventCollection.drop(function (err) { | |
if (err && !done) { | |
done = true | |
return callback(err) | |
} | |
if (--count === 0) { | |
callback(null) | |
} | |
}) | |
} | |
function sub(options) { | |
options.namespace = opts.namespace + "." + options.namespace | |
return EventedRepository(db, options) | |
} | |
} | |
function identity(x) { return x } | |
function noop() {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment