Skip to content

Instantly share code, notes, and snippets.

@domenic
Last active August 29, 2015 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save domenic/65921459ef7a31ec2839 to your computer and use it in GitHub Desktop.
Save domenic/65921459ef7a31ec2839 to your computer and use it in GitHub Desktop.
Byte streams exploration

File streams

According to libuv folks, the most reliable way to do asynchronous file I/O is blocking read(2) in a threadpool. Although Windows has IO Completion Ports, they fall down in enough cases they are largely not used. Regardless, we want to support read(2)-in-a-threadpool models, so this section assumes files are a good instantiation of that model, and analyzes how readable byte streams work in that case.

Reading a file byte stream into an ArrayBuffer

Given a file readable byte stream with a known size, read the entire thing into memory:

async function file_exact(fileRBS, knownFileSize) {
  const ab = new ArrayBuffer(knownFileSize);

  await fileRBS.pull(ab);
  return fileRBS.read();
}

Here fileRBS.pull(ab) transfers ab to newAB, detaching ab itself. The underlying byte source then uses read(2) in a background thread: if we gloss over the mapping between JavaScript ArrayBuffers and C memory regions (void* + size_t), we could write the resulting call as

read(fileRBS@[[fd]], newAB, newAB.byteLength)

Then, the call to fileRBS.read() returns a new Uint8Array view onto all of newAB.

Here we are implicitly relying on a specific implementation detail of our specific file streams: we know that, when told to pull with a given buffer, they will be able to read into the whole buffer (unless they reach EOF first). Another implementation might be tuned to only read in chunks up to a maximum size, in which case this code will not work. This pitfall is very similar to what you would encounter with a socket stream, so we will leave the discussion of how to create a more agnostic stream consumption function until then.

Over-allocation

Here we give a contrived modification of the above to illustrate what happens if we feed a too-large buffer into a file-based readable byte stream. In reality the mechanisms here would only come up in more subtle situations, some of which we'll discuss below.

async function file_overallocate(fileRBS, knownFileSize) {
  const tooBigAB = new ArrayBuffer(knownFileSize * 2);

  await fileRBS.pull(tooBigAB);
  return fileRBS.read();
}

As before, fileRBS.pull(tooBigAB) transfers tooBigAB to newAB, detaching tooBigAB itself. The read(2) call then passes in tooBigAB.byteLength, i.e. knownFileSize * 2, to read(2) as the count argument. However, read(2) returns back that only knownFileSize bytes were read into newAB.

In reaction, the stream machinery will construct a Uint8Array, view, which is a view onto newAB from 0 to knownFileSize. Then, read will return this view. This essentially uses the Uint8Array as a mechanism for packaging up a (buffer, bytesRead) tuple of information. The underlying memory is still retained in view.buffer, and it's still taking up knownFileSize * 2 bytes. It could be re-used for further reads, as we'll show below.

Reading a file chunkwise

Consider the case of reading a file chunkwise, processing chunks as they arrive. This time, let's postulate that we don't know the length of the file. Our goal here is to illustrate how we can do this while only using at most 1 MiB of memory. This is done by feeding the same backing memory into fileRBS.pull repeatedly, even as it is transferred between different JavaScript ArrayBuffers.

async function file_chunkwise(fileRBS, processChunk) {
  let startingBuffer = new ArrayBuffer(1024);

  await fileRBS.pull(startingBuffer);
  while (fileRBS.state !== "closed") {
    const view = fileRBS.read();
    await processChunk(view);
    await fileRBS.pull(view.buffer);
  }
}

There are several things to note in this example:

  • By passing startingBuffer to fileRBS.pull and then passing view.buffer to the next fileRBS.pull, we are continually feeding the stream different ArrayBuffer instances which nevertheless always share the same backing memory.
  • If the stream ever becomes "errored", then fileRBS.read() will throw, causing the file_chunkwise function to return a rejected promise.
  • The last chunk read from the file and subsequently passed to processChunk is likely to be a Uint8Array of length less than 1 MiB. This is automatically done for us by the mechanism described in the previous section.
  • After the last chunk is read and passed to processChunk, the subsequent call to fileRBS.pull will cause read(2) to return zero. The stream will then change its state to "closed", the returned promise will fulfill immediately, and the loop will exit.
  • After reading a given chunk, the readable byte stream does no I/O activity, until you later call pull. Thus, no I/O happens in parallel with processChunk; the program is potentially inefficient in this way. To address this, a pool of several buffers could be used, allowing processChunk to operate on one region of memory while another region is fed back to the stream with fileRBS.pull.

A two-buffer pool for a file stream

This example addresses the issue raised in the previous section with a two-buffer pool.

async function file_pingPong(fileRBS, processChunk) {
  const pool = [new ArrayBuffer(1024), new ArrayBuffer(1024)];

  await fileRBS.pull(pool[0]);
  while (fileRBS.state !== "closed") {
    await Promise.all([
      process(i % 2),
      fileRBS.pull(pool[(i + 1) % 2])
    ]);
    ++i;
  }

  function process(i) {
    const view = fileRBS.read();
    await processChunk(view);
    pool[i] = view.buffer;
  }
}

In this way, we ensure that in parallel with processChunk, at least 1 MiB worth of I/O is going on. Another way of looking at it is that this provides a somewhat crude form of a 1 MiB high water mark: if chunks are processed slower than they can be read, we'll generally have a 1 MiB chunk in reserve, ready to be read out and processed immediately.

Socket streams

When programming against sockets in an evented way, using e.g. epoll(7), the model is less straightforward than with files and read(2). Essentially, one waits to be notified about the socket's fd being ready to read, and then you can read from the socket's file descriptor. However, the notification is about there being any data at all, and not about any given amount being available.

Thus, unlike with the read(2)-in-a-threadpool model we use above for files, with evented sockets we don't get to guarantee that all the requested bytes will be read into our buffer. But as noted above, this is probably a good assumption in general, since even for file streams, the implementation may decide not to translate the user-supplied buffers directly into read(2) calls. Thus, the examples in this section can be seen as more general and robust than the ones above.

Reading a byte stream into ArrayBuffers

Given a readable byte stream, read the entire thing into memory:

async function socket_readAll(rbs) {
  const views = [];

  while (true) {
    const buff = new ArrayBuffer(1024);
    await rbs.pull(buff);

    if (rbs.state === "closed") {
      break;
    }
    views.push(rbs.read());
  }

  return views;
}

The processing model here, for a socket stream, is quite different than for read(2)-in-a-threadpool. Here, when rbs.pull(buff) is called, buff is transferred to newAB as before, but newAB is simply set aside for later. The real work comes in using the epoll(7) mechanism to wait for notification that the socket has data available. Once this signal fires, the underlying source will do several things. First, read the data into newAB, with

recv(rbs@[[fd]], newAB, newAB.byteLength, 0);

(where recv(2) is essentially equivalent to read(2), apart from working only on sockets and accepting other flags which we do not use here). Second, switch on the return value of recv(2):

  • If the return value is EAGAIN or EWOULDBLOCK, restart the process of waiting for a notification from epoll(7), and do nothing else.
  • If the return value is some other negative value, set the stream to errored, and release the backing memory of newAB.
  • If the return value is zero, set the stream to closed, and release the backing memory of newAB.
  • If the return value of recv(2) was less than newAB.byteLength, transfer newAB to newNewAB which has the appropriate byte length (thus releasing the extra memory). The next call to rbs.read() will return newNewAB.
  • If the return value of recv(2) was exactly newAB.byteLength, the next call to rbs.read() will return newAB.

Finally, in all the cases except EAGAIN/EWOULDBLOCK, the source should resolve the promise returned by rbs.pull.

There are several interesting things to note here:

  • Since we cannot take advantage of implementation-specific knowledge about our stream, like we did for file streams, we cannot read all of the data into the same buffer. Even if we knew the total size (for example from a Content-Length header) and preallocated an ArrayBuffer of that size, we wouldn't be able to read into that one ArrayBuffer sequentially, because of the way pull transfers its buffer. Only if we got lucky and the first result.bytesLength was equal to the total length would this be possible, and in general we must assume that epoll(7) will notify us about available data before all Content-Length of the bytes are available.
  • If the chunk size (1024 bytes above) is too small, then extra data will consistently be left in the kernel buffer for at least one more iteration of the loop, and thus what could have been a single chunk gets split across multiple ArrayBuffers.
  • If the chunk size is too large, then memory will be wasted, as each view ends up being a view onto a subset of a larger buffer, some of which goes unused. This could be avoided by manually transfering each view.buffer into a new backing buffer of the right size, since the author of this code knows that they do not want to reuse the extra memory (e.g. as they would in a pooling strategy).

Combining these two latter points, one might imagine a more intelligent version of socket_readAll that will dynamically change the size of the chunks it feeds to rbs. If it notices that the result of rbs.read is consistently smaller than the buffer passed in to rbs.pull, it can start choosing smaller sizes; conversely, if the result is consistently of the same size as the buffer fed in, it might be worth increasing the size in order to leave less in the kernel buffer and reduce the number of ArrayBuffers created.

Reading a byte stream chunkwise

We discused most of the complexity regarding socket streams in the first example, so our knowledge carries over fairly directly to a chunkwise read scenario.

async function socket_chunkwise(rbs, processChunk) {
  let startingBuffer = new ArrayBuffer(1024);

  await rbs.pull(startingBuffer);
  while (rbs.state !== "closed") {
    const view = rbs.read();
    await processChunk(view);
    await rbs.pull(view.buffer);
  }
}

The code here is in fact the same as the code for a readable byte stream representing a file, since unlike in the file_exact example, in file_chunkwise we did not assume details about the file stream's implementation. The main difference here is that while in file_chunkwise all of our chunks except the last one were generally going to be 1 MiB, here that is up to the whims of when epoll(7) notifies us, and so 1 MiB might be over- or under-estimating.

Note that unlike the previous example, here any overestimation will not really "waste" memory in the same sense. The memory will just be re-used on the next iteration, at which point 1 MiB might be a perfectly fine amount.

Agnostic consumers

One of the primary goals of the byte stream design is that byte streams should be consumable by code that does not know it is dealing with a byte stream---generic stream functions and combinators. We give a few example functions here, written to work against any stream, and examine how they behave when presented with both file and socket readable byte streams.

Getting the next chunk

This helper function will return a promise for the next available chunk from a given readable stream. This introduces an artificial delay if there is already data queued, but can provide a convenient interface for simple chunk-by-chunk consumption, as one might do e.g. when streaming database records. It uses an EOF sentinel to signal the end of the stream.

const EOS = Symbol("ReadableStream getNext EOS");

async function generic_getNext(rs) {
  await rs.pull();
  return rs.state === "closed" ? EOS : rs.read();
}

(Note how if the stream is errored, rs.read() will throw, causing the function to return a rejected promise.)

With this simple example, we introduce our guiding principle for these kind of uses cases: we assume that, when pull is called with the argument undefined on a readable byte stream of any sort, then the stream will use its best judgement to allocate and use an appropriate-sized ArrayBuffer. For example, a file stream might pick a fixed size (likely OS-dependent) that has good performance characteristics, whereas a socket stream might try to do something adaptive based on past experience.

Note that the chunk returned will be a Uint8Array. It is probably best if the implementation ensures that chunk.byteLength === chunk.buffer.byteLength by resizing (via transferral) the backing buffer if it is too large.

All in all, this example works pretty well.

Getting all the chunks

async function generic_toArray(rs) {
  const chunks = [];

  while (rs.state !== "closed") {
    await rs.pull();

    do {
      chunks.push(rs.read());
    } while (rs.state === "readable");
  }

  return chunks;
}

(The do-while usage is, again, a trick to ensure rs.read() is called even when the stream is "errored", causing the function to return a rejected promise.)

This example will work for basically the same reason as the previous one. The consumer here doesn't care about the size of their chunks, so it's fine for the stream to set them.

It's interesting that for "normal" readable streams, we allow multiple chunks to result from a single rs.pull() call: that is, the do-while loop could have multiple iterations. However, for readable byte streams, it will always be a single iteration.

Chunkwise processing

Let's do a reprisal of our earlier byte-specific chunkwise processing function, but this time it will be agnostic to the type of stream:

async function generic_chunkwise(rs, processChunk) {
  while (rs.state !== "closed") {
    await rs.pull();

    do {
      await processChunk(rs.read());
    } while (rs.state === "readable");
  }
}

This function will also work, although the differences in it compared to file_chunkwise or socket_chunkwise are illuminating.

The big difference is that, when used on a readable byte stream, generic_chunkwise will create a lot of garbage. Instead of re-using the same memory region over and over, each call to rs.pull() will allocate a new one. Once processChunk has finished with the resulting memory region, then the ArrayBuffer can be garbage-collected, and thus the backing memory freed. But there's some definite inefficiency there.

Also, as before, the do-while loop is always one iteration with readable byte streams.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment