Skip to content

Instantly share code, notes, and snippets.

@ThisIsMissEm
Created July 21, 2019 01:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ThisIsMissEm/3696514698edc469c5eb7a63e81f1e1e to your computer and use it in GitHub Desktop.
Save ThisIsMissEm/3696514698edc469c5eb7a63e81f1e1e to your computer and use it in GitHub Desktop.
const fs = require("fs").promises;
const createReadStream = require("fs").createReadStream;
const debug = require("debug");
class MessageLogDB {
constructor(filename, separator) {
this.separator = separator || "\r\n";
this.separator_length = Buffer.byteLength(this.separator, "utf8");
this.filename = filename;
this.cache = null;
}
async append(message) {
if (!message) return;
await fs.appendFile(
this.filename,
`${message.id}${JSON.stringify(message)}${this.separator}`,
{
encoding: "utf8",
flag: "a+"
}
);
}
async find(id) {
const { latest } = await this.history();
return latest[id];
}
async ids() {
const { ids } = await this.history();
return ids;
}
async history() {
if (this.cache) {
return Promise.resolve(this.cache);
}
const stats = await fs.stat(this.filename);
if (!stats.isFile()) {
debug("history")("Messages database does not exist");
return Promise.resolve({
history: [],
latest: {},
ids: []
});
}
return new Promise((resolve, reject) => {
const stream = createReadStream(this.filename, {
flag: "r+",
highWaterMark: 1024
});
let state = {
phase: "START",
prev: null,
offset: 0,
messages: [],
ids: [],
parseCalls: 0
};
stream.on("data", newData => {
debug("history:silly")("\n>>>>>> DATA >>>>>\n");
state.offset = 0;
let data = newData;
if (state.prev !== null) {
data = Buffer.concat([state.prev, newData]);
state.prev = null;
}
while (state.offset < data.length) {
debug("history:offsets")(
`\noffset: ${state.offset}, length: ${data.length}\n`
);
debug("history:state")("\n>", state);
state = this.parse(data, state);
debug("history:state")("\n<", state);
}
});
stream.on("end", () => {
state.phase = "END";
this.cache = {
history: state.messages,
latest: state.messages.reduce((flattened, message) => {
flattened[message.id] = message;
return flattened;
}, {}),
ids: Object.keys(
state.ids.reduce((acc, id) => {
acc[id] = 1;
return acc;
}, {})
)
};
resolve(this.cache);
});
});
}
parse(data, prevState) {
let state = { ...prevState };
debug("history:silly")(
`\nDATA: ${data.slice(state.offset).toString("utf8")}\n`
);
state.parseCalls += 1;
switch (state.phase) {
case "START":
state.phase = "ID";
return state;
case "ID":
if (data.byteLength - state.offset >= 36) {
const id = data.slice(state.offset, state.offset + 36);
state.phase = "DATA";
state.offset = state.offset + 36;
state.ids.push(id.toString("utf8"));
debug("history:silly")("Added", state.ids[state.ids.length - 1]);
} else {
state.prev = data.slice(state.offset);
state.offset = data.length;
}
break;
case "DATA":
const delimPosition = data.indexOf(this.separator, state.offset);
debug("history:silly")("Delim Position", delimPosition);
if (delimPosition > -1) {
const obj = data.slice(state.offset, delimPosition);
state.messages.push(JSON.parse(obj.toString("utf8")));
state.offset = delimPosition + this.separator_length;
state.phase = "ID";
debug("history:messages")(
"Added",
state.messages[state.messages.length - 1]
);
} else {
state.prev = data.slice(state.offset);
state.offset = data.length;
}
break;
}
if (state.prev) {
debug("history:silly")("prev: ", state.prev.toString("utf8"));
}
return state;
}
totalByteLength(a, b) {
return [a, b].filter(Buffer.isBuffer).reduce((byteLength, buffer) => {
byteLength += buffer.byteLength;
return byteLength;
}, 0);
}
}
@luisenriquecorona
Copy link

I like!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment