Skip to content

Instantly share code, notes, and snippets.

@gdamjan
Last active June 26, 2023 00:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gdamjan/9432f04ac692f8f3bcffeb6abcee4a44 to your computer and use it in GitHub Desktop.
Save gdamjan/9432f04ac692f8f3bcffeb6abcee4a44 to your computer and use it in GitHub Desktop.
fetch couchdb changes as stream
function asContinuousEventStream() {
const transformer = {
start() {
this.buffer = "";
},
transform(chunk, controller) {
this.buffer += chunk;
while (true) {
// parse full lines only
const [line, sep, rest] = this.buffer.split(/(\n)/, 3);
if (sep !== "\n") break;
this.buffer = rest;
if (line !== "") {
controller.enqueue(JSON.parse(line));
} else {
controller.enqueue({ event: "heartbeat" });
}
if (rest === "") break;
}
},
};
return new TransformStream(transformer);
}
async function fetchContinuous(u) {
const controller = new AbortController();
const signal = controller.signal;
const heartbeat = 60000;
const url = new URL(u);
url.searchParams.set("feed", "continuous");
url.searchParams.set("heartbeat", heartbeat);
url.searchParams.set("since", "now");
url.searchParams.set("include_docs", "true");
const headers = { Accept: "application/json" };
const resp = await fetch(url, { headers, signal });
const reader = resp.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(asContinuousEventStream())
.getReader();
while (true) {
let timeoutID = setTimeout(() => {
console.log("Stream stuck, aborting!");
controller.abort();
}, heartbeat * 2);
const { done, value } = await reader.read();
clearTimeout(timeoutID);
if (done) break;
console.log(value);
}
console.log("Stream done");
}
fetchContinuous("https://db.softver.org.mk/irclog/_changes");
function parseEvent(e) {
const o = {};
const lines = e.split(/\n/);
for (line of lines) {
if (line) {
const [key, value] = line.split(/: /, 2);
o[key] = value;
}
}
return o;
}
function asEventSourceStream() {
const transformer = {
start() {
this.buffer = "";
},
transform(chunk, controller) {
this.buffer += chunk;
while (true) {
const [event, sep, rest] = this.buffer.split(/(\n\n)/, 3);
if (sep !== "\n\n") break;
this.buffer = rest;
controller.enqueue(parseEvent(event));
if (rest === "") break;
}
},
};
return new TransformStream(transformer);
}
async function fetchEventStream(url) {
const controller = new AbortController();
const signal = controller.signal;
const heartbeat = 60000;
const u = new URL(url);
u.searchParams.set("feed", "eventsource");
u.searchParams.set("heartbeat", heartbeat);
u.searchParams.set("since", "now");
const headers = { Accept: "text/event-stream" };
const resp = await fetch(u, { headers, signal });
const reader = resp.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(asEventSourceStream())
.getReader();
while (true) {
let timeoutID = setTimeout(() => controller.abort(), heartbeat * 2);
const { done, value } = await reader.read();
clearTimeout(timeoutID);
if (done) break;
console.log(value);
}
console.log("Stream done");
}
fetchEventStream("https://db.softver.org.mk/irclog/_changes");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment