Created
July 21, 2019 01:05
-
-
Save ThisIsMissEm/3696514698edc469c5eb7a63e81f1e1e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require("fs").promises; | |
const createReadStream = require("fs").createReadStream; | |
const debug = require("debug"); | |
class MessageLogDB { | |
constructor(filename, separator) { | |
this.separator = separator || "\r\n"; | |
this.separator_length = Buffer.byteLength(this.separator, "utf8"); | |
this.filename = filename; | |
this.cache = null; | |
} | |
async append(message) { | |
if (!message) return; | |
await fs.appendFile( | |
this.filename, | |
`${message.id}${JSON.stringify(message)}${this.separator}`, | |
{ | |
encoding: "utf8", | |
flag: "a+" | |
} | |
); | |
} | |
async find(id) { | |
const { latest } = await this.history(); | |
return latest[id]; | |
} | |
async ids() { | |
const { ids } = await this.history(); | |
return ids; | |
} | |
async history() { | |
if (this.cache) { | |
return Promise.resolve(this.cache); | |
} | |
const stats = await fs.stat(this.filename); | |
if (!stats.isFile()) { | |
debug("history")("Messages database does not exist"); | |
return Promise.resolve({ | |
history: [], | |
latest: {}, | |
ids: [] | |
}); | |
} | |
return new Promise((resolve, reject) => { | |
const stream = createReadStream(this.filename, { | |
flag: "r+", | |
highWaterMark: 1024 | |
}); | |
let state = { | |
phase: "START", | |
prev: null, | |
offset: 0, | |
messages: [], | |
ids: [], | |
parseCalls: 0 | |
}; | |
stream.on("data", newData => { | |
debug("history:silly")("\n>>>>>> DATA >>>>>\n"); | |
state.offset = 0; | |
let data = newData; | |
if (state.prev !== null) { | |
data = Buffer.concat([state.prev, newData]); | |
state.prev = null; | |
} | |
while (state.offset < data.length) { | |
debug("history:offsets")( | |
`\noffset: ${state.offset}, length: ${data.length}\n` | |
); | |
debug("history:state")("\n>", state); | |
state = this.parse(data, state); | |
debug("history:state")("\n<", state); | |
} | |
}); | |
stream.on("end", () => { | |
state.phase = "END"; | |
this.cache = { | |
history: state.messages, | |
latest: state.messages.reduce((flattened, message) => { | |
flattened[message.id] = message; | |
return flattened; | |
}, {}), | |
ids: Object.keys( | |
state.ids.reduce((acc, id) => { | |
acc[id] = 1; | |
return acc; | |
}, {}) | |
) | |
}; | |
resolve(this.cache); | |
}); | |
}); | |
} | |
parse(data, prevState) { | |
let state = { ...prevState }; | |
debug("history:silly")( | |
`\nDATA: ${data.slice(state.offset).toString("utf8")}\n` | |
); | |
state.parseCalls += 1; | |
switch (state.phase) { | |
case "START": | |
state.phase = "ID"; | |
return state; | |
case "ID": | |
if (data.byteLength - state.offset >= 36) { | |
const id = data.slice(state.offset, state.offset + 36); | |
state.phase = "DATA"; | |
state.offset = state.offset + 36; | |
state.ids.push(id.toString("utf8")); | |
debug("history:silly")("Added", state.ids[state.ids.length - 1]); | |
} else { | |
state.prev = data.slice(state.offset); | |
state.offset = data.length; | |
} | |
break; | |
case "DATA": | |
const delimPosition = data.indexOf(this.separator, state.offset); | |
debug("history:silly")("Delim Position", delimPosition); | |
if (delimPosition > -1) { | |
const obj = data.slice(state.offset, delimPosition); | |
state.messages.push(JSON.parse(obj.toString("utf8"))); | |
state.offset = delimPosition + this.separator_length; | |
state.phase = "ID"; | |
debug("history:messages")( | |
"Added", | |
state.messages[state.messages.length - 1] | |
); | |
} else { | |
state.prev = data.slice(state.offset); | |
state.offset = data.length; | |
} | |
break; | |
} | |
if (state.prev) { | |
debug("history:silly")("prev: ", state.prev.toString("utf8")); | |
} | |
return state; | |
} | |
totalByteLength(a, b) { | |
return [a, b].filter(Buffer.isBuffer).reduce((byteLength, buffer) => { | |
byteLength += buffer.byteLength; | |
return byteLength; | |
}, 0); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I like!!