Skip to content

Instantly share code, notes, and snippets.

@akirco
Last active August 11, 2023 12:49
Show Gist options
  • Save akirco/a3d540b000a9b67b148f92efe201b6e5 to your computer and use it in GitHub Desktop.
Save akirco/a3d540b000a9b67b148f92efe201b6e5 to your computer and use it in GitHub Desktop.
event source stream
tags categories cover
Javascript
development
<!DOCTYPE html>

<html>
  <head> </head>

  <body>
    <button id="start">Start event stream</button>
    <button id="stop">Stop event stream</button>
    <script type="module">
      class EventSourceStream {
        constructor(url, { withCredentials = false } = {}) {
          ({ readable: this.readable, writable: this.writable } =
            new TransformStream());
          this.writer = this.writable.getWriter();
          this.eventSource = new EventSource(url, {
            withCredentials,
          });
          this.eventSource.addEventListener(url, (e) => {
            this.writer.write(new Uint8Array(JSON.parse(e.data)));
          });
          this.eventSource.addEventListener('open', (e) => {
            console.log(e);
          });
          this.eventSource.addEventListener('error', (e) => {
            console.log(e);
          });
        }
        close() {
          this.eventSource.close();
          this.writer.close();
        }
      }

      for (const registration of await navigator.serviceWorker.getRegistrations()) {
        try {
          await registration.unregister();
        } catch (e) {
          console.log(e);
        }
      }

      const sw = await navigator.serviceWorker.register(
        `./sw.js?=${new Date().getTime()}`,
        {
          scope: './',
          // type: 'module',
          updateViaCache: 'none',
        }
      );

      await navigator.serviceWorker.ready;

      let es;

      document.getElementById('start').addEventListener('click', (e) => {
        es = new EventSourceStream('stream');
        es.readable
          .pipeTo(
            new WritableStream({
              write(v) {
                console.log(v.length);
              },
              close() {
                console.log('EventSource closed in Client');
              },
            })
          )
          .catch(console.warn);
      });

      document.getElementById('stop').addEventListener('click', (e) => {
        es.close();
        sw.active.postMessage('abort');
      });
    </script>
  </body>
</html>
let arr = new Uint8Array(1764);

let lastEventId = 0;

let interrupt = void 0;
// Interrupt client Request
let abortable = new Promise((_) => (interrupt = _));

self.addEventListener('install', (event) => {
  console.log(event);
  event.waitUntil(self.skipWaiting());
});

self.addEventListener('activate', (event) => {
  // Claim clients
  event.waitUntil(clients.claim());
});

self.addEventListener('message', (e) => {
  interrupt();
});

self.addEventListener('fetch', async (e) => {
  try {
    if (e.request.url.includes('index.html')) {
      e.respondWith(
        fetch(e.request.url, {
          cache: 'no-store',
        })
      );
    }

    // Keep ServiceWorker active during stream
    if (e.request.url.split('/').pop() === 'stream') {
      console.log(e.request);
      let { readable, writable } = new TransformStream();
      let writer = writable.getWriter();

      e.respondWith(
        new Response(readable.pipeThrough(new TextEncoderStream()), {
          cache: 'no-store',
          keepalive: true,
          status: 200,
          headers: {
            Connection: 'keep-alive',
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-store',
            'Access-Control-Allow-Credentials': 'true',
            'Access-Control-Allow-Origin': '*',
          },
        })
      );
      while (true) {
        crypto.getRandomValues(arr);
        await writer.write(
          `event: stream\n` +
            // `retry: 15000\n` +
            `id: ${lastEventId}\n` +
            `data: ${JSON.stringify([...arr])}\n\n`
        );
        ++lastEventId;
        await Promise.race([
          new Promise((resolve) => setTimeout(resolve, 1000)),
          // Interrupt client Request
          abortable,
        ]);
        if (globalThis.gc) {
          gc();
        }
      }
      writer.abort();
    }
  } catch (error) {
    console.log(error || 'EventSource closed in ServiceWorker');
    // Reset interrupt
    interrupt = void 0;
    abortable = new Promise((_) => (interrupt = _));
  }
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment