According to libuv folks, the most reliable way to do asynchronous file I/O is blocking read(2) in a threadpool. Although Windows has IO Completion Ports, they fall down in enough cases they are largely not used. Regardless, we want to support read(2)-in-a-threadpool models, so this section assumes files are a good instantiation of that model, and analyzes how readable byte streams work in that case.
Given a file readable byte stream with a known size, read the entire thing into memory:
async function file_exact(fileRBS, knownFileSize) {
const ab = new ArrayBuffer(knownFileSize);
await fileRBS.pull(ab);
return fileRBS.read();
}
Here fileRBS.pull(ab)
transfers ab
to newAB
, detaching ab
itself. The underlying byte source then uses read(2) in a background thread: if we gloss over the mapping between JavaScript ArrayBuffer
s and C memory regions (void*
+ size_t
), we could write the resulting call as
read(fileRBS@[[fd]], newAB, newAB.byteLength)
Then, the call to fileRBS.read()
returns a new Uint8Array
view onto all of newAB
.
Here we are implicitly relying on a specific implementation detail of our specific file streams: we know that, when told to pull with a given buffer, they will be able to read into the whole buffer (unless they reach EOF first). Another implementation might be tuned to only read in chunks up to a maximum size, in which case this code will not work. This pitfall is very similar to what you would encounter with a socket stream, so we will leave the discussion of how to create a more agnostic stream consumption function until then.
Here we give a contrived modification of the above to illustrate what happens if we feed a too-large buffer into a file-based readable byte stream. In reality the mechanisms here would only come up in more subtle situations, some of which we'll discuss below.
async function file_overallocate(fileRBS, knownFileSize) {
const tooBigAB = new ArrayBuffer(knownFileSize * 2);
await fileRBS.pull(tooBigAB);
return fileRBS.read();
}
As before, fileRBS.pull(tooBigAB)
transfers tooBigAB
to newAB
, detaching tooBigAB
itself. The read(2) call then passes in tooBigAB.byteLength
, i.e. knownFileSize * 2
, to read(2) as the count
argument. However, read(2) returns back that only knownFileSize
bytes were read into newAB
.
In reaction, the stream machinery will construct a Uint8Array
, view
, which is a view onto newAB
from 0
to knownFileSize
. Then, read
will return this view. This essentially uses the Uint8Array
as a mechanism for packaging up a (buffer, bytesRead) tuple of information. The underlying memory is still retained in view.buffer
, and it's still taking up knownFileSize * 2
bytes. It could be re-used for further reads, as we'll show below.
Consider the case of reading a file chunkwise, processing chunks as they arrive. This time, let's postulate that we don't know the length of the file. Our goal here is to illustrate how we can do this while only using at most 1 MiB of memory. This is done by feeding the same backing memory into fileRBS.pull
repeatedly, even as it is transferred between different JavaScript ArrayBuffer
s.
async function file_chunkwise(fileRBS, processChunk) {
let startingBuffer = new ArrayBuffer(1024);
await fileRBS.pull(startingBuffer);
while (fileRBS.state !== "closed") {
const view = fileRBS.read();
await processChunk(view);
await fileRBS.pull(view.buffer);
}
}
There are several things to note in this example:
- By passing
startingBuffer
tofileRBS.pull
and then passingview.buffer
to the nextfileRBS.pull
, we are continually feeding the stream differentArrayBuffer
instances which nevertheless always share the same backing memory. - If the stream ever becomes
"errored"
, thenfileRBS.read()
will throw, causing thefile_chunkwise
function to return a rejected promise. - The last chunk read from the file and subsequently passed to
processChunk
is likely to be aUint8Array
of length less than 1 MiB. This is automatically done for us by the mechanism described in the previous section. - After the last chunk is read and passed to
processChunk
, the subsequent call tofileRBS.pull
will cause read(2) to return zero. The stream will then change its state to"closed"
, the returned promise will fulfill immediately, and the loop will exit. - After reading a given chunk, the readable byte stream does no I/O activity, until you later call
pull
. Thus, no I/O happens in parallel withprocessChunk
; the program is potentially inefficient in this way. To address this, a pool of several buffers could be used, allowingprocessChunk
to operate on one region of memory while another region is fed back to the stream withfileRBS.pull
.
This example addresses the issue raised in the previous section with a two-buffer pool.
async function file_pingPong(fileRBS, processChunk) {
const pool = [new ArrayBuffer(1024), new ArrayBuffer(1024)];
await fileRBS.pull(pool[0]);
while (fileRBS.state !== "closed") {
await Promise.all([
process(i % 2),
fileRBS.pull(pool[(i + 1) % 2])
]);
++i;
}
function process(i) {
const view = fileRBS.read();
await processChunk(view);
pool[i] = view.buffer;
}
}
In this way, we ensure that in parallel with processChunk
, at least 1 MiB worth of I/O is going on. Another way of looking at it is that this provides a somewhat crude form of a 1 MiB high water mark: if chunks are processed slower than they can be read, we'll generally have a 1 MiB chunk in reserve, ready to be read out and processed immediately.
When programming against sockets in an evented way, using e.g. epoll(7), the model is less straightforward than with files and read(2). Essentially, one waits to be notified about the socket's fd being ready to read, and then you can read from the socket's file descriptor. However, the notification is about there being any data at all, and not about any given amount being available.
Thus, unlike with the read(2)-in-a-threadpool model we use above for files, with evented sockets we don't get to guarantee that all the requested bytes will be read into our buffer. But as noted above, this is probably a good assumption in general, since even for file streams, the implementation may decide not to translate the user-supplied buffers directly into read(2) calls. Thus, the examples in this section can be seen as more general and robust than the ones above.
Given a readable byte stream, read the entire thing into memory:
async function socket_readAll(rbs) {
const views = [];
while (true) {
const buff = new ArrayBuffer(1024);
await rbs.pull(buff);
if (rbs.state === "closed") {
break;
}
views.push(rbs.read());
}
return views;
}
The processing model here, for a socket stream, is quite different than for read(2)-in-a-threadpool. Here, when rbs.pull(buff)
is called, buff
is transferred to newAB
as before, but newAB
is simply set aside for later. The real work comes in using the epoll(7) mechanism to wait for notification that the socket has data available. Once this signal fires, the underlying source will do several things. First, read the data into newAB
, with
recv(rbs@[[fd]], newAB, newAB.byteLength, 0);
(where recv(2) is essentially equivalent to read(2), apart from working only on sockets and accepting other flags which we do not use here). Second, switch on the return value of recv(2):
- If the return value is
EAGAIN
orEWOULDBLOCK
, restart the process of waiting for a notification from epoll(7), and do nothing else. - If the return value is some other negative value, set the stream to errored, and release the backing memory of
newAB
. - If the return value is zero, set the stream to closed, and release the backing memory of
newAB
. - If the return value of recv(2) was less than
newAB.byteLength
, transfernewAB
tonewNewAB
which has the appropriate byte length (thus releasing the extra memory). The next call torbs.read()
will returnnewNewAB
. - If the return value of recv(2) was exactly
newAB.byteLength
, the next call torbs.read()
will returnnewAB
.
Finally, in all the cases except EAGAIN
/EWOULDBLOCK
, the source should resolve the promise returned by rbs.pull
.
There are several interesting things to note here:
- Since we cannot take advantage of implementation-specific knowledge about our stream, like we did for file streams, we cannot read all of the data into the same buffer. Even if we knew the total size (for example from a
Content-Length
header) and preallocated anArrayBuffer
of that size, we wouldn't be able to read into that oneArrayBuffer
sequentially, because of the waypull
transfers its buffer. Only if we got lucky and the firstresult.bytesLength
was equal to the total length would this be possible, and in general we must assume that epoll(7) will notify us about available data before allContent-Length
of the bytes are available. - If the chunk size (1024 bytes above) is too small, then extra data will consistently be left in the kernel buffer for at least one more iteration of the loop, and thus what could have been a single chunk gets split across multiple
ArrayBuffer
s. - If the chunk size is too large, then memory will be wasted, as each view ends up being a view onto a subset of a larger buffer, some of which goes unused. This could be avoided by manually transfering each
view.buffer
into a new backing buffer of the right size, since the author of this code knows that they do not want to reuse the extra memory (e.g. as they would in a pooling strategy).
Combining these two latter points, one might imagine a more intelligent version of socket_readAll
that will dynamically change the size of the chunks it feeds to rbs
. If it notices that the result of rbs.read
is consistently smaller than the buffer passed in to rbs.pull
, it can start choosing smaller sizes; conversely, if the result is consistently of the same size as the buffer fed in, it might be worth increasing the size in order to leave less in the kernel buffer and reduce the number of ArrayBuffer
s created.
We discused most of the complexity regarding socket streams in the first example, so our knowledge carries over fairly directly to a chunkwise read scenario.
async function socket_chunkwise(rbs, processChunk) {
let startingBuffer = new ArrayBuffer(1024);
await rbs.pull(startingBuffer);
while (rbs.state !== "closed") {
const view = rbs.read();
await processChunk(view);
await rbs.pull(view.buffer);
}
}
The code here is in fact the same as the code for a readable byte stream representing a file, since unlike in the file_exact
example, in file_chunkwise
we did not assume details about the file stream's implementation. The main difference here is that while in file_chunkwise
all of our chunks except the last one were generally going to be 1 MiB, here that is up to the whims of when epoll(7) notifies us, and so 1 MiB might be over- or under-estimating.
Note that unlike the previous example, here any overestimation will not really "waste" memory in the same sense. The memory will just be re-used on the next iteration, at which point 1 MiB might be a perfectly fine amount.
One of the primary goals of the byte stream design is that byte streams should be consumable by code that does not know it is dealing with a byte stream---generic stream functions and combinators. We give a few example functions here, written to work against any stream, and examine how they behave when presented with both file and socket readable byte streams.
This helper function will return a promise for the next available chunk from a given readable stream. This introduces an artificial delay if there is already data queued, but can provide a convenient interface for simple chunk-by-chunk consumption, as one might do e.g. when streaming database records. It uses an EOF sentinel to signal the end of the stream.
const EOS = Symbol("ReadableStream getNext EOS");
async function generic_getNext(rs) {
await rs.pull();
return rs.state === "closed" ? EOS : rs.read();
}
(Note how if the stream is errored, rs.read()
will throw, causing the function to return a rejected promise.)
With this simple example, we introduce our guiding principle for these kind of uses cases: we assume that, when pull
is called with the argument undefined
on a readable byte stream of any sort, then the stream will use its best judgement to allocate and use an appropriate-sized ArrayBuffer
. For example, a file stream might pick a fixed size (likely OS-dependent) that has good performance characteristics, whereas a socket stream might try to do something adaptive based on past experience.
Note that the chunk returned will be a Uint8Array
. It is probably best if the implementation ensures that chunk.byteLength === chunk.buffer.byteLength
by resizing (via transferral) the backing buffer if it is too large.
All in all, this example works pretty well.
async function generic_toArray(rs) {
const chunks = [];
while (rs.state !== "closed") {
await rs.pull();
do {
chunks.push(rs.read());
} while (rs.state === "readable");
}
return chunks;
}
(The do
-while
usage is, again, a trick to ensure rs.read()
is called even when the stream is "errored"
, causing the function to return a rejected promise.)
This example will work for basically the same reason as the previous one. The consumer here doesn't care about the size of their chunks, so it's fine for the stream to set them.
It's interesting that for "normal" readable streams, we allow multiple chunks to result from a single rs.pull()
call: that is, the do
-while
loop could have multiple iterations. However, for readable byte streams, it will always be a single iteration.
Let's do a reprisal of our earlier byte-specific chunkwise processing function, but this time it will be agnostic to the type of stream:
async function generic_chunkwise(rs, processChunk) {
while (rs.state !== "closed") {
await rs.pull();
do {
await processChunk(rs.read());
} while (rs.state === "readable");
}
}
This function will also work, although the differences in it compared to file_chunkwise
or socket_chunkwise
are illuminating.
The big difference is that, when used on a readable byte stream, generic_chunkwise
will create a lot of garbage. Instead of re-using the same memory region over and over, each call to rs.pull()
will allocate a new one. Once processChunk
has finished with the resulting memory region, then the ArrayBuffer
can be garbage-collected, and thus the backing memory freed. But there's some definite inefficiency there.
Also, as before, the do
-while
loop is always one iteration with readable byte streams.