Skip to content

Instantly share code, notes, and snippets.

@creationix
Created November 21, 2012 19:21
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save creationix/4127047 to your computer and use it in GitHub Desktop.
Save creationix/4127047 to your computer and use it in GitHub Desktop.
Parts of a stream interface

The Basics

Every stream has two basic parts

  • data-in - A chunk or EOF enters the stream
  • data-out - A chunk or EOF leaves the stream

I'm combining both "data" and "end" events since "end" is just a special data packet signifying the end of the stream.

Flow Control

Flow control is dealing with situations when the reader and writer to a stream aren't moving at the same speed. If the reader is faster, it's not a problem, the queue will get drained and the reader will be forced to wait on more data to appear. But when the writer is faster, data will queue up inside the stream and use tons of ram. What is needed is some mechanism to tell the writer to slow down when the reader is overwhelmed and resume when it's feeling better.

For proper flow control, some more controls are needed

  • pause-in - Inform the input we need a rest, data in queue will still emit
  • resume-in - Inform the input to resume pumping data
  • pause-out - We don't want the output to emit any more data, buffer till resumed.
  • resume-out - Drain the buffer and emit events again

Mixed Protocols

Sometimes we're consuming a stream and after parsing data we realize that not all the data in the chunk was for us. The last part was for another piece of code that's expecting to have it come out of the stream.

A concrete example of this is http websockets. Suppose the stream chunk contains the http headers and some websocket data. When we read this chunk and send it to the http parser, it will notify us that not all the data was used. We then need to stop listening and hand-off the stream to another part of the code that does websocket protocol, but we need some way to return the extra data back to the stream

  • data-return - We grabbed too much data and want to return some for other code to consume.
  • data-peek-and-flush - Another technique is to peek and then manually flush the parts we consumed

Errors

How does error handling and propagation fit into streams? Should they be related or go through some other communication channel? There is a difference between a stream crashing (internet disconnect, fs error, power outage...) and a finite stream reaching it's end.

Pull vs Push APIs

There are many APIs for dealing with streams. For readable streams, I've seen two main camps.

Pull style APIs are where the consumer repeatedly calls some read function when it wants data. The stream can infer flow-control speed of the consumer based on how fast it calls read. If the consumer stops calling read, maybe we should pause the input.

function read() {
  stream.read(function (chunk) {
    // Do Something
    read();
  });
}
read()

Push style APIs are often easier to use, but make flow-control more explicit. Since the stream emits data as it gets it, the consumer has no choice but to handle it when it happens. When it's not ready to read more data, it needs to explicitly pause the stream while it drains.

stream.on("data", function (chunk) {
  // Do something
});
stream.pause();
stream.resume();

Proposed Stream Interface

My goal is to design an API that fits my needs and is simple to use. Ideally the interface is so simple that a helper library isn't needed, but I think I'll end up needing one in the end to get all the useful bits like high-water and low-water marks.

All streams are both readable and writable by someone, but to the end-user consuming the stream, they usually only have one end. In the case of something like a duplex tcp stream, this is actually two streams and the user has the readable end of one and the writable end of another. This means that a stream can export it's readable or writable interface in some concise manner so that they can be packaged into a single public object.

Creating a Readable File Stream

This stream is for reading a file from the filesystem as a stream. The public half is the readable end. The internal implementation has access to the writable half and writes to it. This way the person implementing the fs stream only has to interface with the writable end instead of implementing the readable end. A helper library is very useful here. The filesystem is pull-based because of the APIs we have available.

local function newFileReadStream(fd, onDone)
  local stream = newStream()
  local offset = 0
  local chunkSize = 40960
  local function read(err)
    if (err) error(err)
    uv.read(fd, offset, chunkSize, function (err, chunk)
      if err then error(err) end
      -- chunk will be nil when we've reached the end of the file
      if chunk then
        offset = offset + #chunk
      else
        onDone()
      end
      -- The stream will call read immedietly if it wants more data
      -- It will call it later if it wants us to slow down
      stream.write(chunk)(read)
    end)
  end
  read()
  -- Export just the readable half
  return {
    read = stream.read
    unshift = stream.unshift
  }
end

Writable File Stream

local function newFileWriteStream(fd, onDone)
  local stream = newStream()
  local offset = 0
  local chunkSize = 40960
  local function write(err, chunk)
    if err error(err)
    if not chunk then
      return onDone()
    else
    uv.write(fd, offset, chunk, function (err)
      if err then error(err) end
      offset = offset + #chunk
      stream.read()(write)
    end)
  end
  stream.read()(write)
  -- Export just the writable half
  return {
    write = stream.write
  }
end

Duplex TCP Stream

This is a stream for reading from and writing to a tcp socket. The socket is push-style when reading.

-- Handle is a uv_tcp_t instance from uv, it can be either client or server,
-- the API is the same
local function newHandleStream(handle)

  -- Connect data coming from the socket to emit on the stream
  local receiveStream = newStream()
  local function write(handle, chunk)
    -- If write doesn't callback sync, then we need to pause and resume the socket
    local async
    receiveStream.write(chunk)(function (err)
      if err then error(err) end
      if async == nil then async = false end
      if async then
        handle:readStart()
    end)
    if async == nil then
      async = true
      handle:readStop()
    end
  end
  handle.ondata = write
  handle.onend = write

  -- Connect data being written to the stream and write it to the handle
  local sendStream = newStream()
  local function read(err)
    if err then error(err) end
    local async
    sendStream.read()(function (err, chunk)
      if err then error(err) end
      if chunk then
        handle:write(chunk, read)
      else
        handle:shutdown(read)
      end
    end)
  end
  read()

  -- Return the halfs of the streams we're not using
  return {
    read = receiveStream.read,
    unshift = receiveStream.unshift,
    write = sendStream.write
  }
end

...

local function newQueue()
local head = {}
local tail = {}
local index = 1
local headLength = 0
local length = 0
local function shift()
if index > headLength then
-- When the head is empty, swap it with the tail to get fresh items
head, tail = tail, head
index = 1
headLength = #head
-- If it's still empty, return nothing
if headLength == 0 then
return
end
end
-- There was an item in the head, let's pull it out
local value = head[index]
-- And remove it from the head
head[index] = nil
-- And bump the index
index = index + 1
length = length - 1
return value
end
local function unshift()
-- Insert the item at the front of the head queue
headLength = headLength + 1
return table.insert(head, 1, item)
end
local function push(item)
-- Pushes always go to the write-only tail
length = length + 1
return table.insert(tail, item)
end
return {
unshift = unshift,
shift = shift,
push = push
}
end
local function newStream()
-- If there are more than this many buffered input chunks, readStop the source
local highWaterMark = 1
-- If there are less than this many buffered chunks, readStart the source
local lowWaterMark = 1
local paused = false
local processing = false
local inputQueue = newQueue()
local readerQueue = newQueue()
local resumeList = {}
local function processReaders()
if processing then return end
processing = true
while inputQueue.length > 0 and readerQueue.length > 0 do
local chunk = inputQueue.shift()
local reader = readerQueue.shift()
reader(nil, chunk)
end
local watermark = inputQueue.length - readerQueue.length
if not paused then
if watermark > highWaterMark then
paused = true
end
else
if watermark < lowWaterMark then
paused = false
if #resumeList > 0 then
local callbacks = resumeList
resumeList = {}
for i = 1, #callbacks do
callbacks[i]()
end
end
end
end
processing = false
end
local function read() return function (callback)
readerQueue.push(callback)
processReaders()
end end
local function write(chunk) return function (callback)
inputQueue.push(chunk)
processReaders()
if callback then
if paused then
table.insert(resumeList, callback)
else
callback()
end
end
end end
local function unshift(chunk)
inputQueue.unshift(chunk)
processReaders()
end
return {
read = read,
write = write,
unshift = unshift
}
end
return {
newStream = newStream,
newQueue = newQueue
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment