Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Last active December 30, 2015 12:19
Show Gist options
  • Save pchiusano/7828759 to your computer and use it in GitHub Desktop.
Save pchiusano/7828759 to your computer and use it in GitHub Desktop.
Some early design notes on adding a streaming layer to scodec

Much of this is out of date, but leaving it here for posterity. See the scodec-stream and scodec-bits projects, where all the details got worked out.


Current signature of Codec in scodec is:

trait Codec[A] {
  /** Attempts to encode the specified value in to a bit vector. */
  def encode(a: A): String \/ BitVector

  /** Attempts to decode a value of type `A` from the specified bit vector. */
  def decode(bits: BitVector): String \/ (BitVector, A)
}

Looking at decode first. (Minor note - consider having decode just return an Error \/ (Int,A), the number of bytes consumed on success. It's unlikely that the decoder needs to insert bytes into the stream it is decoding, since that sort of thing could be expressed in other ways. This also makes it easier to implement streaming since the caller of decode can easily detect how many bytes were read, otherwise we need to somehow infer this from the returned BitVector.)

For streaming decoding, there are a few general approaches:

  • The Codec can indicate up front how many bits it will need to read. The driver can the load up a BitVector of exactly this. This is not realistic in general (any kind of context sensitivity will kill this).
  • The decode function can return a third state, 'not finished', due to insufficient input. So something like: A => Err \/ (Int, A) \/ Codec[A]. Kind of weird that the continuation is a Codec, so with this approach you'd want to split out a separate Decoder[A] type. This can be kind of a pervasive change, though, and it can make writing decoders more complicated because you have to keep in mind the possibility that you might not have all required input.
  • The decode function can accept an Int => IO[BitVector] or something similar, giving the decoder the ability to pull. The return type of decode is then something like IO[Error \/ (Int,A)]. This is also kind of a big change.
  • Or give the decoder access to all the bits (lazily). This is lazy I/O, but if wrapped up in nice safe combinators, it will just be an implementation detail, so it might be fine.

I think this fourth option is probably the right one, because it requires minimal changes to the existing API and means that streaming could be done as a totally separate layer. The way this would work is we'd have a couple new implementations of BitVector that lazily allow access to the entire stream:

  • NIOBitVector is backed by a java.nio.ByteBuffer (which in turn could be created via memory-mapping a large file via FileChannel.map) and a range of indices into this ByteBuffer. The drop and take functions of BitVector do the obvious thing of adjusting the range of indices rather than actually copying the buffer.
  • InputStreamBitVector is backed by a java.io.InputStream whose results are 'locally' memoized to ensure referential transparency. Unless the decoder does serious backtracking or peeking (which are probably very limited in practice), I think this will 'just work'. I'll need to try it out, though.

When using these types, there is the problem that decoders could in theory retain the BitVector after decoding completes (returning it as part of the output value). We could prevent this using ST, but the overhead of this (both syntactically, computationally, and having to worry about SOEs) isn't worth it IMO. I just don't think this bug is very common, people would generally have to be doing something very deliberately wrong to retain and reuse a BitVector after decoding completes, especially if they are assembling their decoders via combinators.

Once we have these new BitVector implementations, we can now write combinators like:

/** Parses multiple `A` values, one after another, from the `InputStream`. */
def many[A:Codec](in: InputStream): Process[Task,A]

/** Parses multiple `A` values, separated by `D` ('delimiter') values, which are ignored. */
def sep[A:Codec,D:Codec](in: InputStream): Process[Task,A]

And whatever other parsing combinators are useful. The returned Process[Task,A] can then be manipulated using all the usual scalaz-stream combinators. Internally, combinators like many and sep will use NIOBitVector or InputStreamBitVector which is allocated locally and then closed by the returned Process[Task,A] on completion. So long as the Codec for A doesn't hold on to the BitVector it gets and return it as part of the A, this is safe.

Encoding

The signature of encode is problematic for streaming:

def encode(a: A): Error \/ BitVector

Because Error is on the outside, we need to know up front whether an error occurred while encoding, which implies that the BitVector must be produced strictly. I think this is actually okay though, we should just introduce streaming encoding as a separate layer. So, the individual 'elements' will be written strictly, but the top-level stream container will be lazy. We can assemble streaming encoders also via combinators:

package scodec.streaming

object Encode {
  type Bitstream = Process[Task,BitVector]
  
  /** Output each `A` one after another to the `Bitstream`. */
  def many[A:Codec](in: Process[Task,A]): Bitstream
  
  /** Output each `A` one after another, with `D` as the delimeter. */
  def sep[A:Codec,D:Codec](delim: D)(in: Process[Task,A]): Bitstream
}

Etc. We can then manipulate the Bitstream using the usual scalaz-stream combinators, send them to sinks, and so on. Also, the output of a streaming decode operation can be fed to these combinators, so we can express use cases like - do a streaming decode of packets from a 1GB memory-mapped file, manipulate these packets in some way, and dump them to some output file, working incrementally and in constant memory. Nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment