Skip to content

Instantly share code, notes, and snippets.

@neet
Forked from assaf/stream.ts
Created December 23, 2022 03:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save neet/218c59d25489156e5e4364e20925b36f to your computer and use it in GitHub Desktop.
Save neet/218c59d25489156e5e4364e20925b36f to your computer and use it in GitHub Desktop.
Read status updates from Mastodon streaming API
type StreamEvent =
| { event: "update"; status: Status }
| { event: "status.update"; status: Status }
| { event: "delete"; id: string; status?: never };
/**
* Use like this:
* const { events } = await stream(`https://${instance}/api/v1/streaming/public`);
* for await (const { id, status } of events) {
* if (status) … do something …
* else if (id) … deleted …
* }
*/
async function stream(url: string): Promise<{
events: AsyncGenerator<StreamEvent, void, unknown>;
close: () => void;
}> {
const response = await fetch(url);
if (!response.ok) throw new Error(`Stream failed: ${response.statusText}`);
const reader = response.body!.getReader();
const utf8decoder = new TextDecoder();
function close() {
reader.cancel();
clearTimeout(timeout);
}
// Server sends heartbeat every 15 seconds
let timeout = setTimeout(close, 20_000);
async function* iterator(): AsyncGenerator<StreamEvent, void, unknown> {
try {
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) throw new Error("Stream closed");
const chunk = utf8decoder.decode(value);
buffer += chunk;
let message;
while ((message = buffer.match(/^(.|\n)*\n{2}/m)?.[0])) {
yield* yieldEvent(message);
clearTimeout(timeout);
timeout = setTimeout(close, 20_000);
buffer = buffer.slice(message.length);
}
}
} finally {
clearTimeout(timeout);
}
}
return { events: iterator(), close };
}
const serializer = new SerializerNativeImpl();
function* yieldEvent(message: string): Generator<StreamEvent, void, unknown> {
// Message may or may not contain event, eg :thump is just a heartbeat
// message.
const event = message.match(/^event: (.*)$/m)?.[1];
const data = message.match(/^data: (.*)$/m)?.[1];
if ((event === "update" || event === "status.update") && data) {
const status = serializer.deserialize<Status>("application/json", data);
yield { event, status };
} else if (event === "delete" && data) yield { event, id: JSON.parse(data) };
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment