Skip to content

Instantly share code, notes, and snippets.

@substack substack/kv.js
Created Oct 29, 2018

Embed
What would you like to do?
trie k/v but it's slow
var createHash = require('crypto').createHash
module.exports = KV
var KEY_SIZE = 16
var TRIE_SIZE = KEY_SIZE*4
function KV (storage, opts) {
if (!(this instanceof KV)) return new KV(storage, opts)
if (!opts) opts = {}
this.storage = storage
this._isReady = false
this._heads = opts.heads ? opts.heads : null
this._tries = null
this._headerBlock = null
this._nextFree = -1
this._readyQueue = []
this._init()
}
KV.prototype.heads = function (cb) {
var self = this
self._ready(function () {
cb(null, self._heads)
})
}
KV.prototype._ready = function (cb) {
if (this._isReady) return cb()
this._readyQueue.push(cb)
}
KV.prototype.heads = function (cb) {
var self = this
self._ready(function (err) {
if (err) cb(err)
else cb(null, self._heads)
})
}
KV.prototype._init = function () {
var self = this
self.storage.read(0, 4096, onread)
function onread (err, buf) {
if (!buf) {
self._headerBlock = Buffer.alloc(4096)
self._heads = []
self._tries = []
self._nextFree = 4096
} else {
self._headerBlock = buf
self._nextFree = buf.readUInt32BE(0)
var headLen = buf.readUInt32BE(4)
self._tries = []
if (self._heads === null) {
self._heads = []
for (var i = 0; i < headLen; i++) {
self._heads.push(buf.readUInt32BE(8+i*4))
self._tries.push(new Uint32Array(TRIE_SIZE))
}
} else {
for (var i = 0; i < self._heads.length; i++) {
self._tries.push(new Uint32Array(TRIE_SIZE))
}
}
}
var pending = 1 + self._heads.length
self._heads.forEach(function (head,i) {
self.storage.read(head, 4+KEY_SIZE+TRIE_SIZE*4, function (err, buf) {
if (err) return ready(err)
for (var j = 0; j < TRIE_SIZE; j++) {
self._tries[i][j] = buf.readUInt32BE(4+KEY_SIZE+j*4)
}
if (--pending === 0) ready()
})
})
if (--pending === 0) ready()
}
function ready (err) {
for (var i = 0; i < self._readyQueue.length; i++) {
if (err) self._readyQueue[i](err)
else self._readyQueue[i]()
}
self._readyQueue = null
self._isReady = !err
}
}
KV.prototype._alloc = function (bytes, cb) {
var self = this
self._ready(function () {
var offset = self._nextFree
self._nextFree += bytes
cb(null, offset)
})
}
KV.prototype._writeHeader = function (cb) {
var self = this
if (self._headerBlock.length !== 4096) {
return process.nextTick(cb, new Error('headerBlock.length !== 4096'))
}
self._headerBlock.writeUInt32BE(self._nextFree, 0)
self._headerBlock.writeUInt32BE(self._heads.length, 4)
for (var i = 0; i < self._heads.length; i++) {
self._headerBlock.writeUInt32BE(self._heads[i], 8+i*4)
}
self.storage.write(0, self._headerBlock, cb)
}
KV.prototype.put = function (rkey, value, cb) {
var self = this
var key = toField(rkey)
var size = 4 + KEY_SIZE + TRIE_SIZE*4 + value.length
self._alloc(size, function (err, offset) {
if (self._tries[0]) {
self._tries[0] = Uint32Array.from(self._tries[0])
} else {
self._tries[0] = new Uint32Array(TRIE_SIZE)
}
for (var i = 0; i < key.length; i++) {
self._tries[0][i*4+key[i]] = offset
}
self._heads[0] = offset
var buf = Buffer.alloc(size)
buf.writeUInt32BE(size, 0)
var o = 4
for (var i = 0; i < key.length; i+=4) {
buf[o++] = (key[i+0]<<0)|(key[i+1]<<2)|(key[i+2]<<4)|(key[i+3]<<6)
}
for (var i = 0; i < self._tries[0].length; i++) {
buf.writeUInt32BE(self._tries[0][i], o)
o += 4
}
value.copy(buf, o)
self.storage.write(offset, buf, function (err) {
if (err) return cb(err)
self._writeHeader(function (err) {
if (err) cb(err)
else cb(null, offset)
})
})
})
}
KV.prototype.get = function (rkey, cb) {
var self = this
var key = toField(rkey)
var N = 4+KEY_SIZE+TRIE_SIZE*4
var visited = 0
self._ready(function () {
if (self._heads.length === 0) return cb(null, undefined)
var offset = self._heads[0]
self._read(offset, N, onread)
function onread (err, buf) {
visited++
if (err) return cb(err)
if (buf.length === 0) return cb(null, undefined)
for (var i = 0; i < KEY_SIZE; i++) {
var b = ((buf[4+Math.floor(i/4)])>>(i%4*2))%4
if (key[i] !== b) {
var jump = buf.readUInt32BE(4+KEY_SIZE+i*4*4+key[i]*4)
if (jump === 0) return cb(null, undefined)
return self._read(jump,N,onread)
}
}
var len = buf.readUInt32BE(0)
if (len > N) {
self._read(offset+N, len-N, function (err, sbuf) {
if (err) return cb(err)
cb(null, Buffer.concat([buf.slice(4+KEY_SIZE+TRIE_SIZE*4),sbuf]))
})
} else cb(null,buf.slice(4+KEY_SIZE+TRIE_SIZE*4,len))
console.log('visited=',visited)
}
})
}
KV.prototype.checkout = function (heads) {
if (!Array.isArray(heads)) heads = [heads]
return new KV(this.storage, { heads: heads })
}
KV.prototype._read = function (i, j, cb) {
if (this.storage.length !== undefined) {
this.storage.read(i, Math.min(this.storage.length-i,j), cb)
} else if (this._nextFree >= 0) {
this.storage.read(i, Math.min(this._nextFree-i,j), cb)
} else {
this.storage.read(i, j, cb)
}
}
function toField (key) {
var bytes = hash(Buffer.from(key))
var field = []
for (var i = 0; i < bytes.length; i++) {
var b = bytes[i]
field.push((b>>0)%4,(b>>2)%4,(b>>4)%4,(b>>6)%4)
}
return field
}
function hash (buf) {
var h = createHash('sha256')
h.update(buf)
return h.digest().slice(0,KEY_SIZE)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.