Skip to content

Instantly share code, notes, and snippets.

@jsuereth
Created September 13, 2011 18:57
Show Gist options
  • Save jsuereth/1214709 to your computer and use it in GitHub Desktop.
Save jsuereth/1214709 to your computer and use it in GitHub Desktop.
A prototype iteratee library that uses Scala traits as an abstract module system
package scalax.io
/**
* A generic set of iteratees for general purpose usage. These iteratees
* make no assumption on error types and do not do any specialized communication
* between producers and consumers.
*/
trait GenericIteratees extends Iteratees {
/**
* Consumes the head element from the stream and places it
* back onto the stream.
*/
def peek[I]: Consumer[I,Option[I]] = Consumer cont {
case c @ Chunk(in) => Consumer.done(Some(in), c)
case e @ EOF => Consumer.done(None, e)
}
/**
* Consumes the head element off the incoming stream.
*/
def head[I: Empty]: Consumer[I, Option[I]] = Consumer cont {
case EmptyChunk() => head[I]
case c @ Chunk(in) => Consumer.done(Some(in), Chunk.empty)
case e @ EOF => Consumer.done(None, e)
}
/** Counts the number of non-empty Chunk inputs received. */
def counter[I: Empty]: Consumer[I, Long] = {
def step(count: Long): StreamValue[I] => Consumer[I,Long] = {
case EmptyChunk() => Consumer cont step(count)
case e @ EOF => Consumer.done(count, e)
case c @ Chunk(_) => Consumer cont step(count + 1)
}
Consumer cont step(0)
}
/** Repeats a consumer indefinitely until end of file, storing results in a Vector. */
def repeat[I : Empty,O](in: Consumer[I, O]): Consumer[I, Vector[O]] = {
def step(current: Vector[O]): StreamValue[I] => Consumer[I,Vector[O]] = {
case EmptyChunk() => Consumer cont step(current)
case e @ EOF => Consumer.done(current, e)
case c @ Chunk(_) =>
Consumer flatten in.fold[Consumer[I,Vector[O]]](
done = (value, el) => contexted(Consumer.done(current :+ value, el)),
error = (err, el) => contexted(Consumer.error(err, el)),
cont = k =>
contexted(for(h <- k(c); t <- repeat(in)) yield h +: t)
)
}
Consumer cont step(Vector())
}
}
package scalax.io
/** A Module of sorts to contain generic implementation of Iteratees. */
trait Iteratees {
type Error
type Context[X]
implicit def context_monad: Monad[Context]
implicit def context_functor: Functor[Context]
implicit def context_pointed: Pointed[Context]
/** Converts a raw value into a Context wrapped value */
def contexted[A](a: A): Context[A] = context_pointed(a)
/** The lowest trait that represents values that can be passed to consumers. This hierarchy is
* expected to be extended for 'smart' communication in a domain. For example: Network
* Iteratees may be able to pass new types of stream values whereas File Iteratees might not.
*/
trait StreamValue[+T]
case object EOF extends StreamValue[Nothing]
case class Chunk[Input](input: Input) extends StreamValue[Input]
object Chunk {
def empty[T](implicit ev0: Empty[T]) = Chunk(ev0.empty)
}
/** This object can be used to pattern match against empty chunked inputs. */
object EmptyChunk {
def unapply[T: Empty](t: StreamValue[T]) = t match {
case Chunk(Empty()) => true
case _ => false
}
}
/** This represents an immutable stream consumer that will eventually produce a value.
* A stream Consumer can be in one of two states:
*
* 1. Finished with a value
* 2. Waiting for more input.
*/
trait Consumer[I,O] {
/** Fold accepts three continuations for the three states of this consumer.
* The consumer will take whichever action is appropriate given its state.
*/
def fold[R](done: (O, StreamValue[I]) => Context[R],
cont: (StreamValue[I] => Consumer[I,O]) => Context[R],
error: (Error, StreamValue[I]) => Context[R]
): Context[R]
/** Attempts to pull a 'finished' value out of the Consumer by passing an
* EOF token to it.
*
* Note: This could throw an error if the Consumer is not finished consuming.
*/
def result: Context[O] = fold[O](
done = (o, i) => contexted(o),
cont = f =>
f(EOF: StreamValue[I]).fold[O](
done = (o,_) => contexted(o),
cont = _ => error("Divergent Consumer"),
error = (e,_) => error("Processing Erorr: " + e)
),
error = (e,_) => error("Processing Erorr: " + e))
/**
* Flattens this Consumer if it's the result of a Converter application or
* returns another consumer for some reason.
*/
def join[I2, O2](implicit ev0: O <:< Consumer[I2, O2],
ev1: Empty[I]): Consumer[I,O2] =
this flatMap { iter2 =>
Consumer flatten iter2.fold[Consumer[I,O2]](
done = (a, i) => contexted(Consumer.done(a, Chunk.empty)),
error = (e,_) => contexted(Consumer.error(e, Chunk.empty)),
cont = k => k(EOF).fold(
done = (a, i) => contexted(Consumer.done(a, Chunk.empty)),
error = (e, _) => contexted(Consumer.error(e, Chunk.empty)),
cont = k => error("join: Divergent iteratee!")
)
)
}
// Note: These are here to defeat Scala's implicit system...
def map[B](f: O => B): Consumer[I,B] =
Consumer.consumerFunctor.map(this)(f)
def flatten[B](implicit ev0: Consumer[I,O] <:< Consumer[I, Consumer[I,B]]): Consumer[I,B] =
Consumer.consumerMonad.flatten(this)
def flatMap[B](f: O => Consumer[I,B]): Consumer[I,B] =
Consumer.consumerMonad.flatMap(this)(f)(Consumer.consumerFunctor)
// TODO - Zip
}
object Consumer {
// Constructs the done case of consumers
def done[I,O](value: O, remaining: StreamValue[I]) =
new Consumer[I,O] {
def fold[R](
d: (O, StreamValue[I]) => Context[R],
cont: (StreamValue[I] => Consumer[I,O]) => Context[R],
error: (Error, StreamValue[I]) => Context[R]
): Context[R] = d(value, remaining)
override def toString = "Consumer.done("+result+","+remaining+")"
}
/** Constructs a consumer that accepts the next stream input and
* returns the next consumer.
*/
def cont[I,O](f: StreamValue[I] => Consumer[I,O]) =
new Consumer[I,O] {
def fold[R](
done: (O, StreamValue[I]) => Context[R],
cont: (StreamValue[I] => Consumer[I,O]) => Context[R],
error: (Error, StreamValue[I]) => Context[R]
): Context[R] = cont(f)
override def toString = "Consumer.cont("+f+")"
}
/** Constructs a consumer that represents an error condition. */
def error[I,O](err: Error, last: StreamValue[I]) =
new Consumer[I,O] {
def fold[R](
done: (O, StreamValue[I]) => Context[R],
cont: (StreamValue[I] => Consumer[I,O]) => Context[R],
error: (Error, StreamValue[I]) => Context[R]
): Context[R] = error(err, last)
override def toString = "Error("+err+","+last+")"
}
/** Removes the Context from around a consumer.
* TODO - Look into making this implicit...
*/
def flatten[I,O](i: Context[Consumer[I,O]]) =
new Consumer[I,O] {
def fold[R](
done: (O, StreamValue[I]) => Context[R],
cont: (StreamValue[I] => Consumer[I,O]) => Context[R],
error: (Error, StreamValue[I]) => Context[R]
): Context[R] = context_monad.flatMap(i)(_.fold(done = done, cont = cont, error = error))
override def toString = "Flattened("+i+")"
}
implicit def consumerPointed[I: Empty] = new Pointed[({type C[A] = Consumer[I,A]})#C] {
def apply[A](a: => A): Consumer[I,A] =
Consumer.done(a, Chunk.empty[I])
}
implicit def consumerFunctor[I] = new Functor[({type C[A] = Consumer[I,A]})#C] { functor =>
override def map[A,B](ma: Consumer[I, A])(f: A=>B): Consumer[I, B] = new Consumer[I, B] {
override def fold[R](
done: (B, StreamValue[I]) => Context[R],
cont: (StreamValue[I] => Consumer[I,B]) => Context[R],
error: (Error, StreamValue[I]) => Context[R]
): Context[R] =
ma.fold[R](
done = (a, i) => done(f(a), i),
cont = f2 => cont(f2.andThen(functor.map(_)(f))),
error = (e,i) => error(e,i)
)
}
}
implicit def consumerMonad[I] = new Monad[({type C[A] = Consumer[I,A]})#C] {
override def flatMap[A,B](ma: Consumer[I,A])(f: A => Consumer[I,B])(implicit functor: Functor[({type C[A] = Consumer[I,A]})#C]) =
Consumer flatten ma.fold[Consumer[I,B]](
done = (a, next) => contexted {
Producer once next into f(a)
},
cont = k => contexted {
cont(in => flatMap(k(in))(f))
},
error = (e, next) => contexted(error(e,next))
)
// TODO - More efficient flatMap...
override def flatten[A](mma: Consumer[I, Consumer[I,A]]) =
flatMap(mma)(identity)
}
}
/**
* This class produces values and feeds them through a Consumer. It returns the
* immutable consumer reuslting from feeding the value.
*/
trait Producer[A] { self =>
/** Feeds values from this producer into a consumer. */
def into[O](c: Consumer[A, O]): Consumer[A, O]
/** Append a second producer's input after this one. */
def andThen(p: Producer[A]) = new Producer[A] {
def into[O](c: Consumer[A,O]): Consumer[A,O] =
p into (self into c)
override def toString = "Producer("+self+" andThen "+p+")"
}
}
object Producer {
/** Constructs a producer the feeds one value into a consumer and then stops. */
def once[T](in: StreamValue[T]): Producer[T] = new Producer[T] {
override def into[O](c: Consumer[T, O]): Consumer[T, O] =
Consumer flatten (c.fold[Consumer[T,O]](
done = (_,_) => contexted(c),
cont = f => contexted(f(in)),
error = (_, _) => contexted(c)
))
override def toString = "Producer.once("+in+")"
}
/** Sends the EOF input to a Consumer. */
def eof[T] = once[T](EOF : StreamValue[T])
}
/**
* This class converts a consumer of one input type to a consumer of another input type. These
* usually represent things like decrypted input streams, conversion from Byte to Char or
* grouping input for efficient consumption.
*/
trait Conversion[I,I2] {
def apply[O](i : Consumer[I2,O]): Consumer[I, Consumer[I2, O]]
def convert[O](i: Consumer[I2, O])(implicit ev0: Empty[I]): Consumer[I, O] =
apply(i).join
}
}
package scalax.io
object SeqIteratees extends Iteratees with StrictIteratees with GenericIteratees {
def enumerate[A](seq: Iterable[A]): Producer[A] = new Producer[A] {
override def into[O](c: Consumer[A,O]): Consumer[A,O] = {
// Now we drive the values...
// Note: This is ugly and may be buggy in threaded situations!
var consumer = c
val itr = seq.iterator
var continue = true
while (continue && itr.hasNext) {
val input = itr.next
consumer = Consumer flatten consumer.fold[Consumer[A,O]](
done = (_,_) => {
continue = false
contexted(consumer) },
cont = f => contexted(f(Chunk(input))),
error = (_,_) => {
continue = false
contexted(consumer) }
)
}
consumer
}
}
}
package scalax.io
/** A specialization of iteratees where all the iteratees use 'strict' evaluation fed through
* a trampoline.
*/
trait StrictIteratees extends Iteratees {
type Error = String
type Context[X] = X
implicit override lazy val context_monad = new Monad[Context] {
override def flatten[A](x: A): A = x
}
implicit override lazy val context_functor = new Functor[Context] {
override def map[A,B](ma: A)(f: A => B): B = f(ma)
}
implicit override lazy val context_pointed = new Pointed[Context] {
override def apply[A](a: => A) = a
}
}
@jsuereth
Copy link
Author

Note: SeqIteratees is horribly implemented right now, just wanted to get the idea down for proof-of-concept.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment