Created
September 13, 2011 18:57
-
-
Save jsuereth/1214709 to your computer and use it in GitHub Desktop.
A prototype iteratee library that uses Scala traits as an abstract module system
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalax.io | |
/** | |
* A generic set of iteratees for general purpose usage. These iteratees | |
* make no assumption on error types and do not do any specialized communication | |
* between producers and consumers. | |
*/ | |
trait GenericIteratees extends Iteratees { | |
/** | |
* Consumes the head element from the stream and places it | |
* back onto the stream. | |
*/ | |
def peek[I]: Consumer[I,Option[I]] = Consumer cont { | |
case c @ Chunk(in) => Consumer.done(Some(in), c) | |
case e @ EOF => Consumer.done(None, e) | |
} | |
/** | |
* Consumes the head element off the incoming stream. | |
*/ | |
def head[I: Empty]: Consumer[I, Option[I]] = Consumer cont { | |
case EmptyChunk() => head[I] | |
case c @ Chunk(in) => Consumer.done(Some(in), Chunk.empty) | |
case e @ EOF => Consumer.done(None, e) | |
} | |
/** Counts the number of non-empty Chunk inputs received. */ | |
def counter[I: Empty]: Consumer[I, Long] = { | |
def step(count: Long): StreamValue[I] => Consumer[I,Long] = { | |
case EmptyChunk() => Consumer cont step(count) | |
case e @ EOF => Consumer.done(count, e) | |
case c @ Chunk(_) => Consumer cont step(count + 1) | |
} | |
Consumer cont step(0) | |
} | |
/** Repeats a consumer indefinitely until end of file, storing results in a Vector. */ | |
def repeat[I : Empty,O](in: Consumer[I, O]): Consumer[I, Vector[O]] = { | |
def step(current: Vector[O]): StreamValue[I] => Consumer[I,Vector[O]] = { | |
case EmptyChunk() => Consumer cont step(current) | |
case e @ EOF => Consumer.done(current, e) | |
case c @ Chunk(_) => | |
Consumer flatten in.fold[Consumer[I,Vector[O]]]( | |
done = (value, el) => contexted(Consumer.done(current :+ value, el)), | |
error = (err, el) => contexted(Consumer.error(err, el)), | |
cont = k => | |
contexted(for(h <- k(c); t <- repeat(in)) yield h +: t) | |
) | |
} | |
Consumer cont step(Vector()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalax.io | |
/** A Module of sorts to contain generic implementation of Iteratees. */ | |
trait Iteratees { | |
type Error | |
type Context[X] | |
implicit def context_monad: Monad[Context] | |
implicit def context_functor: Functor[Context] | |
implicit def context_pointed: Pointed[Context] | |
/** Converts a raw value into a Context wrapped value */ | |
def contexted[A](a: A): Context[A] = context_pointed(a) | |
/** The lowest trait that represents values that can be passed to consumers. This hierarchy is | |
* expected to be extended for 'smart' communication in a domain. For example: Network | |
* Iteratees may be able to pass new types of stream values whereas File Iteratees might not. | |
*/ | |
trait StreamValue[+T] | |
case object EOF extends StreamValue[Nothing] | |
case class Chunk[Input](input: Input) extends StreamValue[Input] | |
object Chunk { | |
def empty[T](implicit ev0: Empty[T]) = Chunk(ev0.empty) | |
} | |
/** This object can be used to pattern match against empty chunked inputs. */ | |
object EmptyChunk { | |
def unapply[T: Empty](t: StreamValue[T]) = t match { | |
case Chunk(Empty()) => true | |
case _ => false | |
} | |
} | |
/** This represents an immutable stream consumer that will eventually produce a value. | |
* A stream Consumer can be in one of two states: | |
* | |
* 1. Finished with a value | |
* 2. Waiting for more input. | |
*/ | |
trait Consumer[I,O] { | |
/** Fold accepts three continuations for the three states of this consumer. | |
* The consumer will take whichever action is appropriate given its state. | |
*/ | |
def fold[R](done: (O, StreamValue[I]) => Context[R], | |
cont: (StreamValue[I] => Consumer[I,O]) => Context[R], | |
error: (Error, StreamValue[I]) => Context[R] | |
): Context[R] | |
/** Attempts to pull a 'finished' value out of the Consumer by passing an | |
* EOF token to it. | |
* | |
* Note: This could throw an error if the Consumer is not finished consuming. | |
*/ | |
def result: Context[O] = fold[O]( | |
done = (o, i) => contexted(o), | |
cont = f => | |
f(EOF: StreamValue[I]).fold[O]( | |
done = (o,_) => contexted(o), | |
cont = _ => error("Divergent Consumer"), | |
error = (e,_) => error("Processing Erorr: " + e) | |
), | |
error = (e,_) => error("Processing Erorr: " + e)) | |
/** | |
* Flattens this Consumer if it's the result of a Converter application or | |
* returns another consumer for some reason. | |
*/ | |
def join[I2, O2](implicit ev0: O <:< Consumer[I2, O2], | |
ev1: Empty[I]): Consumer[I,O2] = | |
this flatMap { iter2 => | |
Consumer flatten iter2.fold[Consumer[I,O2]]( | |
done = (a, i) => contexted(Consumer.done(a, Chunk.empty)), | |
error = (e,_) => contexted(Consumer.error(e, Chunk.empty)), | |
cont = k => k(EOF).fold( | |
done = (a, i) => contexted(Consumer.done(a, Chunk.empty)), | |
error = (e, _) => contexted(Consumer.error(e, Chunk.empty)), | |
cont = k => error("join: Divergent iteratee!") | |
) | |
) | |
} | |
// Note: These are here to defeat Scala's implicit system... | |
def map[B](f: O => B): Consumer[I,B] = | |
Consumer.consumerFunctor.map(this)(f) | |
def flatten[B](implicit ev0: Consumer[I,O] <:< Consumer[I, Consumer[I,B]]): Consumer[I,B] = | |
Consumer.consumerMonad.flatten(this) | |
def flatMap[B](f: O => Consumer[I,B]): Consumer[I,B] = | |
Consumer.consumerMonad.flatMap(this)(f)(Consumer.consumerFunctor) | |
// TODO - Zip | |
} | |
object Consumer { | |
// Constructs the done case of consumers | |
def done[I,O](value: O, remaining: StreamValue[I]) = | |
new Consumer[I,O] { | |
def fold[R]( | |
d: (O, StreamValue[I]) => Context[R], | |
cont: (StreamValue[I] => Consumer[I,O]) => Context[R], | |
error: (Error, StreamValue[I]) => Context[R] | |
): Context[R] = d(value, remaining) | |
override def toString = "Consumer.done("+result+","+remaining+")" | |
} | |
/** Constructs a consumer that accepts the next stream input and | |
* returns the next consumer. | |
*/ | |
def cont[I,O](f: StreamValue[I] => Consumer[I,O]) = | |
new Consumer[I,O] { | |
def fold[R]( | |
done: (O, StreamValue[I]) => Context[R], | |
cont: (StreamValue[I] => Consumer[I,O]) => Context[R], | |
error: (Error, StreamValue[I]) => Context[R] | |
): Context[R] = cont(f) | |
override def toString = "Consumer.cont("+f+")" | |
} | |
/** Constructs a consumer that represents an error condition. */ | |
def error[I,O](err: Error, last: StreamValue[I]) = | |
new Consumer[I,O] { | |
def fold[R]( | |
done: (O, StreamValue[I]) => Context[R], | |
cont: (StreamValue[I] => Consumer[I,O]) => Context[R], | |
error: (Error, StreamValue[I]) => Context[R] | |
): Context[R] = error(err, last) | |
override def toString = "Error("+err+","+last+")" | |
} | |
/** Removes the Context from around a consumer. | |
* TODO - Look into making this implicit... | |
*/ | |
def flatten[I,O](i: Context[Consumer[I,O]]) = | |
new Consumer[I,O] { | |
def fold[R]( | |
done: (O, StreamValue[I]) => Context[R], | |
cont: (StreamValue[I] => Consumer[I,O]) => Context[R], | |
error: (Error, StreamValue[I]) => Context[R] | |
): Context[R] = context_monad.flatMap(i)(_.fold(done = done, cont = cont, error = error)) | |
override def toString = "Flattened("+i+")" | |
} | |
implicit def consumerPointed[I: Empty] = new Pointed[({type C[A] = Consumer[I,A]})#C] { | |
def apply[A](a: => A): Consumer[I,A] = | |
Consumer.done(a, Chunk.empty[I]) | |
} | |
implicit def consumerFunctor[I] = new Functor[({type C[A] = Consumer[I,A]})#C] { functor => | |
override def map[A,B](ma: Consumer[I, A])(f: A=>B): Consumer[I, B] = new Consumer[I, B] { | |
override def fold[R]( | |
done: (B, StreamValue[I]) => Context[R], | |
cont: (StreamValue[I] => Consumer[I,B]) => Context[R], | |
error: (Error, StreamValue[I]) => Context[R] | |
): Context[R] = | |
ma.fold[R]( | |
done = (a, i) => done(f(a), i), | |
cont = f2 => cont(f2.andThen(functor.map(_)(f))), | |
error = (e,i) => error(e,i) | |
) | |
} | |
} | |
implicit def consumerMonad[I] = new Monad[({type C[A] = Consumer[I,A]})#C] { | |
override def flatMap[A,B](ma: Consumer[I,A])(f: A => Consumer[I,B])(implicit functor: Functor[({type C[A] = Consumer[I,A]})#C]) = | |
Consumer flatten ma.fold[Consumer[I,B]]( | |
done = (a, next) => contexted { | |
Producer once next into f(a) | |
}, | |
cont = k => contexted { | |
cont(in => flatMap(k(in))(f)) | |
}, | |
error = (e, next) => contexted(error(e,next)) | |
) | |
// TODO - More efficient flatMap... | |
override def flatten[A](mma: Consumer[I, Consumer[I,A]]) = | |
flatMap(mma)(identity) | |
} | |
} | |
/** | |
* This class produces values and feeds them through a Consumer. It returns the | |
* immutable consumer reuslting from feeding the value. | |
*/ | |
trait Producer[A] { self => | |
/** Feeds values from this producer into a consumer. */ | |
def into[O](c: Consumer[A, O]): Consumer[A, O] | |
/** Append a second producer's input after this one. */ | |
def andThen(p: Producer[A]) = new Producer[A] { | |
def into[O](c: Consumer[A,O]): Consumer[A,O] = | |
p into (self into c) | |
override def toString = "Producer("+self+" andThen "+p+")" | |
} | |
} | |
object Producer { | |
/** Constructs a producer the feeds one value into a consumer and then stops. */ | |
def once[T](in: StreamValue[T]): Producer[T] = new Producer[T] { | |
override def into[O](c: Consumer[T, O]): Consumer[T, O] = | |
Consumer flatten (c.fold[Consumer[T,O]]( | |
done = (_,_) => contexted(c), | |
cont = f => contexted(f(in)), | |
error = (_, _) => contexted(c) | |
)) | |
override def toString = "Producer.once("+in+")" | |
} | |
/** Sends the EOF input to a Consumer. */ | |
def eof[T] = once[T](EOF : StreamValue[T]) | |
} | |
/** | |
* This class converts a consumer of one input type to a consumer of another input type. These | |
* usually represent things like decrypted input streams, conversion from Byte to Char or | |
* grouping input for efficient consumption. | |
*/ | |
trait Conversion[I,I2] { | |
def apply[O](i : Consumer[I2,O]): Consumer[I, Consumer[I2, O]] | |
def convert[O](i: Consumer[I2, O])(implicit ev0: Empty[I]): Consumer[I, O] = | |
apply(i).join | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalax.io | |
object SeqIteratees extends Iteratees with StrictIteratees with GenericIteratees { | |
def enumerate[A](seq: Iterable[A]): Producer[A] = new Producer[A] { | |
override def into[O](c: Consumer[A,O]): Consumer[A,O] = { | |
// Now we drive the values... | |
// Note: This is ugly and may be buggy in threaded situations! | |
var consumer = c | |
val itr = seq.iterator | |
var continue = true | |
while (continue && itr.hasNext) { | |
val input = itr.next | |
consumer = Consumer flatten consumer.fold[Consumer[A,O]]( | |
done = (_,_) => { | |
continue = false | |
contexted(consumer) }, | |
cont = f => contexted(f(Chunk(input))), | |
error = (_,_) => { | |
continue = false | |
contexted(consumer) } | |
) | |
} | |
consumer | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalax.io | |
/** A specialization of iteratees where all the iteratees use 'strict' evaluation fed through | |
* a trampoline. | |
*/ | |
trait StrictIteratees extends Iteratees { | |
type Error = String | |
type Context[X] = X | |
implicit override lazy val context_monad = new Monad[Context] { | |
override def flatten[A](x: A): A = x | |
} | |
implicit override lazy val context_functor = new Functor[Context] { | |
override def map[A,B](ma: A)(f: A => B): B = f(ma) | |
} | |
implicit override lazy val context_pointed = new Pointed[Context] { | |
override def apply[A](a: => A) = a | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note: SeqIteratees is horribly implemented right now, just wanted to get the idea down for proof-of-concept.