Skip to content

Instantly share code, notes, and snippets.

@segphault
Created February 25, 2018 00:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save segphault/c0af3428dbafa6619512ae2afb8e0e7a to your computer and use it in GitHub Desktop.
Save segphault/c0af3428dbafa6619512ae2afb8e0e7a to your computer and use it in GitHub Desktop.
Async iterator issue

I recently began experimenting with async iterators, using the --harmony flag in Node 9.5. After reading through the MDN docs, I figured that it might be interesting to make a function that takes the values yielded by an async generator function and serves them as an SSE event stream. The following is a simple, contrived example:

const http = require("http");

const timer = time =>
  new Promise(resolve => setTimeout(resolve, time));

async function* counter() {
  let counter = 0;
  while (true) {
    await timer(5000);
    yield counter++;
  }
}

async function iterStream(res, iter) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  for await (let item of iter)
    res.write(`event: counter\ndata: ${item}\n\n`);
}

http.createServer((req, res) =>
  iterStream(res, counter())).listen(8000);

The iterator yields a new value from the counter every five seconds. The iterStream function takes each new value and broadcasts it to the connected user. With an EventSource on the client side, I received the JSON object with the incrementing value:

let events = new EventSource("/");
events.addEventListener("counter", ({data}) => console.log(data));

Next, a slightly less contrived example. Instead of using a timer to introduce an artificial delay between items, I'm going to use a RethinkDB query with a changefeed, which broadcasts live updates from the database:

const http = require("http");
const r = require("rethinkdbdash")();

async function* query() {
  let cursor = await r.db("rethinkdb").table("stats").changes();
  while (true)
    yield cursor.next();
}

async function iterStream(res, iter) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  for await (let item of iter)
    res.write(`event: change\ndata: ${JSON.stringify(item)}\n\n`);
}

http.createServer((req, res) =>
  iterStream(res, query())).listen(8000);

The query function waits for and yields new values from the database in a loop. For reference, the client library's cursor does not conform with the iteration protocol, it just (coincidentally) happens to use next as the name of the method that retrieves a new item. The cursor.next method returns a promise that resolves when a new item is available from the database.

There's a minor wrinkle, however: the database connection continues to stream cursor results even after the user disconnects. I figured that I could address the problem by invoking iter.return in my iterStream function when the connection closes. When the iterator terminates, I can use a finally block in my query function to close the database cursor:

async function* query() {
  let cursor = await r.db("test").table("test").changes();

  try {
    while (true)
      yield cursor.next();
  }
  finally {
    console.log("Closing the cursor");
    cursor.close();
  }
}

async function iterStream(res, iter) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  res.connection.on("close", () => iter.return());
  for await (let item of iter)
    res.write(`event: change\ndata: ${JSON.stringify(item)}\n\n`);
}

This approach seems to work, but not exactly as I expected. When the user closes their connection and iter.return forces the generator to finish, the pending cursor.next call sits there and waits until one more item comes in before the finally block executes—it doesn't interrupt the pending promise.

I tried using iter.throw instead of iter.return to see if it would give me the desired behavior, but it does basically the same thing: with iter.throw, a catch block in the query function doesn't execute until the next yield.

This same behavior is reproducible with the timed counter example:

async function* test() {
  let n = 0;

  try {
    while (true) {
      console.log("Awaiting the timer")
      await timer(5000);
      console.log("Yielding a new item")
      yield n++;
    }
  }
  finally {
    console.log("Iterator finished")
  }
}

async function iterStream(res, iter) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  res.connection.on("close", () => {
    console.log("Connection closed");
    iter.return();
  });

  for await (let item of iter) {
    console.log("Sending item:", item);
    res.write(`event: item\ndata: ${item}\n\n`);
  }
}

When I run the example above and disconnect after receiving the second item, the resulting output looks like this:

waiting the timer
Yielding a new item
Sending item: 0
Awaiting the timer
Yielding a new item
Sending item: 1
Awaiting the timer
Yielding a new item
Sending item: 2
Awaiting the timer
Connection closed
Yielding a new item
Iterator finished
Sending item: 3

After the connection closes, the finally block doesn't execute until the pending timer completes.

Am I missing something? Is there another approach that I could use to address connection termination in this example while still using async iterators?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment