Skip to content

Instantly share code, notes, and snippets.

@arschles
Created May 31, 2012 00:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arschles/2839762 to your computer and use it in GitHub Desktop.
Save arschles/2839762 to your computer and use it in GitHub Desktop.
Iteratee recollection from memory
import scalaz._
import Scalaz._
import scalaz.concurrent._
import java.util.concurrent.atomic._
//the representation of a "chunk" of data (including errors or EOF)
//this was the best name I could ever come up with, but it seems like there's a better one
sealed trait Chunk {
def fold[T](valid: Array[Byte] => T, error: Throwable => T, eof: => T) = this match {
case ValidBytes(data) => valid(data)
case Error(t) => error(t)
case EOF => eof
}
}
case class ValidBytes(data: Array[Byte]) extends Chunk
case class Error(t: Throwable) extends Chunk
case object EOF extends Chunk
//representation of a stream of data - this could just as easily be just a function that gets returned
//to caller that returns Promise[Chunk], but it's implemented as a trait for 2 reasons:
//(1) to illustrate the interface w/o having to provide an example downstream
//(2) to enable an easy way to provide a "constant" function that produces a stream of a constant amount of data
//
//also, it's important to return a Promise[Chunk] IMO from nextChunk to force the method to think about asynchronous-ness.
//obviously the caller of nextChunk can take care of this, but it's my personal preference to put it here.
//also, the caller of nextChunk is responsible for synchronizing on each Promise and sending results to client in order.
//it is also responsible for being tail recursive, not calling nextChunk after it returns EOF, and generally non-shitty
sealed trait Stream {
def nextChunk: Promise[Chunk]
}
object Stream {
def constant(d: Array[Byte]) = new Stream {
private val chunks = ValidBytes(b) :: EOF :: Nil
private val i = new AtomicInteger(chunks.length)
def nextChunk = promise {
//either get the next elt in the queue, or EOF. obviously this is subject to integer overflow problems, etc...
//(even though the downstream isn't supposed to call it anymore after it returns EOF, but you get the idea).
chunks.get(i.getAndIncrement) | EOF
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment