Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Created March 10, 2024 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cowboyd/9350650be830bf4648f60ed8aadc5e5a to your computer and use it in GitHub Desktop.
Save cowboyd/9350650be830bf4648f60ed8aadc5e5a to your computer and use it in GitHub Desktop.
Model a remote iterable using `EventSource` and Effection
import { main } from "npm:effection@3.0.3";
import { useEventStream } from "./use-event-stream.ts";
// consume the server as an interable.
await main(function* () {
let source = yield* useEventStream<number, string>("http://localhost:8000");
let next = yield* source.next();
while (!next.done) {
console.log(next.value);
next = yield* source.next();
}
console.log(next.value);
});
import {
createSignal,
once,
race,
resource,
spawn,
type Stream,
} from "npm:effection@3.0.3";
import { EventSource } from "https://deno.land/x/eventsource@v0.0.3/mod.ts";
export function useEventStream<T, TDone>(url: string): Stream<T, TDone> {
return resource(function* (provide) {
let source = new EventSource(url);
let signal = createSignal<T, TDone>();
// blow up if you get an error event. this will be automatically halted
// if the stream exists successfully.
yield* spawn(function* () {
throw yield* once(source, "error");
});
// send { done: false, value } to the stream
let onYield = function (event: Event) {
let data = JSON.parse((event as MessageEvent).data);
signal.send(data as T);
};
// close the stream with { done: true, value }
let onReturn = function(event: Event) {
let data = JSON.parse((event as MessageEvent).data);
signal.close(data as TDone);
}
try {
source.addEventListener("yield", onYield);
source.addEventListener("return", onReturn);
// close out this resource when the computation finishes
// or we receive the "return" event
yield* race([once(source, "return"), provide(yield* signal)]);
} finally {
source.removeEventListener("yield", onYield);
source.removeEventListener("return", onReturn);
}
});
}
import { ServerSentEventStream } from "jsr:@std/http";
import { call, main, sleep, suspend, useScope } from "npm:effection@3.0.3";
await main(function* () {
const ac = new AbortController();
let scope = yield* useScope();
const server = Deno.serve({
handler: (_req) => {
let sse = new ServerSentEventStream();
scope.run(function* () {
let writer = sse.writable.getWriter();
try {
for (let i = 5; i >= 1; i--) {
yield* call(() => writer.write({ event: "yield", data: JSON.stringify(i) }));
yield* sleep(1000);
}
yield* call(() => writer.write({ event: "return", data: JSON.stringify("blast off!") }));
} finally {
yield* call(() => writer.close());
}
});
return new Response(sse.readable, {
headers: {
"content-type": "text/event-stream",
"cache-control": "no-cache",
},
});
},
signal: ac.signal,
});
try {
yield* suspend();
} finally {
ac.abort();
yield* call(() => server.finished);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment