Created
May 31, 2012 00:16
-
-
Save arschles/2839762 to your computer and use it in GitHub Desktop.
Iteratee recollection from memory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz._ | |
import Scalaz._ | |
import scalaz.concurrent._ | |
import java.util.concurrent.atomic._ | |
//the representation of a "chunk" of data (including errors or EOF) | |
//this was the best name I could ever come up with, but it seems like there's a better one | |
sealed trait Chunk { | |
def fold[T](valid: Array[Byte] => T, error: Throwable => T, eof: => T) = this match { | |
case ValidBytes(data) => valid(data) | |
case Error(t) => error(t) | |
case EOF => eof | |
} | |
} | |
case class ValidBytes(data: Array[Byte]) extends Chunk | |
case class Error(t: Throwable) extends Chunk | |
case object EOF extends Chunk | |
//representation of a stream of data - this could just as easily be just a function that gets returned | |
//to caller that returns Promise[Chunk], but it's implemented as a trait for 2 reasons: | |
//(1) to illustrate the interface w/o having to provide an example downstream | |
//(2) to enable an easy way to provide a "constant" function that produces a stream of a constant amount of data | |
// | |
//also, it's important to return a Promise[Chunk] IMO from nextChunk to force the method to think about asynchronous-ness. | |
//obviously the caller of nextChunk can take care of this, but it's my personal preference to put it here. | |
//also, the caller of nextChunk is responsible for synchronizing on each Promise and sending results to client in order. | |
//it is also responsible for being tail recursive, not calling nextChunk after it returns EOF, and generally non-shitty | |
sealed trait Stream { | |
def nextChunk: Promise[Chunk] | |
} | |
object Stream { | |
def constant(d: Array[Byte]) = new Stream { | |
private val chunks = ValidBytes(b) :: EOF :: Nil | |
private val i = new AtomicInteger(chunks.length) | |
def nextChunk = promise { | |
//either get the next elt in the queue, or EOF. obviously this is subject to integer overflow problems, etc... | |
//(even though the downstream isn't supposed to call it anymore after it returns EOF, but you get the idea). | |
chunks.get(i.getAndIncrement) | EOF | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment