Skip to content

Instantly share code, notes, and snippets.

@gauthamnair
Created July 8, 2015 12:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gauthamnair/d98a1906f8219460ce3b to your computer and use it in GitHub Desktop.
Save gauthamnair/d98a1906f8219460ce3b to your computer and use it in GitHub Desktop.
fromFPinscala + transducers.
package code
object Process0 {
sealed trait Process[I, O] {
import Process._
def |>[O2](p2: Process[O,O2]): Process[I,O2] = compose(this, p2)
def ++(p2: Process[I,O]): Process[I,O] = concat(this, p2)
def flatMap[O2](f: O => Process[I,O2]): Process[I,O2] = nest(this)(f)
}
case class Emit[I,O](
head: O,
tail: Process[I,O] = Halt[I,O]())
extends Process[I,O]
case class Await[I,O](
recv: Option[I] => Process[I, O])
extends Process[I,O]
case class Halt[I,O]() extends Process[I,O]
object Process {
def stream[I,O](proc: Process[I,O])(xs: Stream[I]): Stream[O] =
proc match {
case Emit(out, nextSteps) => out #:: stream(nextSteps)(xs)
case Await(receiver) => xs match {
case in #:: nextItems => stream(receiver(Some(in)))(nextItems)
case emptyInput => stream(receiver(None))(emptyInput)
}
case Halt() => Stream.empty
}
def withHaltDefault[I,O](f: I => Process[I,O]): Option[I] => Process[I,O] =
_.map(f).getOrElse(Halt[I,O]())
def awaitWithHaltOnEmpty[I,O](f: I => Process[I,O]): Process[I,O] =
Await[I,O](withHaltDefault(f))
def oneInputWithFn1[I,O](f: I => O): Process[I,O] =
awaitWithHaltOnEmpty { input => Emit[I,O](f(input), Halt()) }
def fromFn1[I,O](f: I => O): Process[I,O] =
awaitWithHaltOnEmpty { input => Emit[I,O](f(input), fromFn1(f)) }
def concat[I,O](p1: Process[I,O], p2: Process[I,O]): Process[I,O] =
p1 match {
case Emit(h, rest) => Emit(h, concat(rest, p2))
case Await(recv) => Await { input => concat(recv(input), p2) }
case Halt() => p2
}
def continue[I,O](
p: Process[I,O],
onComplete: => Process[I,O],
onNoInput: => Process[I,O]): Process[I,O] =
p match {
case Emit(h, rest) => Emit(h, continue(rest, onComplete, onNoInput))
case Await(recv) => Await {
case Some(input) => continue(recv(Some(input)), onComplete, onNoInput)
case None => onNoInput
}
case Halt() => onComplete
}
def repeat[I,O](proc: Process[I,O]): Process[I,O] =
continue(proc, onComplete=repeat(proc), onNoInput=Halt())
def filter[I](satisfies: I => Boolean): Process[I,I] =
awaitWithHaltOnEmpty { input =>
if (satisfies(input)) Emit(input, filter(satisfies))
else Halt()
}
def identityProcess[I]: Process[I,I] = fromFn1(identity)
def take[I](n: Int): Process[I,I] =
if (n <= 0) Halt()
else awaitWithHaltOnEmpty { i => Emit(i, take(n-1)) }
def drop[I](n: Int): Process[I,I] =
if (n <= 0) identityProcess
else awaitWithHaltOnEmpty { _ => drop(n - 1) }
def takeWhile[I](f: I => Boolean): Process[I,I] =
awaitWithHaltOnEmpty { input =>
if (f(input)) Emit(input, takeWhile(f))
else Halt()
}
def dropWhile[I](f: I => Boolean): Process[I,I] =
awaitWithHaltOnEmpty { input =>
if (f(input)) dropWhile(f)
else identityProcess
}
def loop[I,S,O](z: S)(f: (I,S) => (O,S)): Process[I,O] =
awaitWithHaltOnEmpty { input =>
val (output, nextState) = f(input, z)
Emit(output, loop(nextState)(f))
}
def foldLeft[A,B](z: B)(f: (B,A) => B): Process[A,B] =
awaitWithHaltOnEmpty { a =>
val accumulated = f(z,a)
Emit(accumulated, tail=foldLeft(accumulated)(f))
}
def count[I]: Process[I,Long] = foldLeft(0L)((c,_) => c + 1L)
def sum: Process[Double, Double] = foldLeft(0.0)(_ + _)
def mean: Process[Double, Double] =
loop((0.0, 0L)) {
(input: Double, state: (Double, Long)) =>
val (prevTotal, prevCount) = state
val newTotal = prevTotal + input
val newCount = prevCount + 1L
(newTotal/newCount, (newTotal, newCount))
}
def compose[A,B,C](p1: Process[A,B], p2: Process[B,C]): Process[A,C] = {
p2 match {
case Await(bReceiver) => p1 match {
case Emit(b, rest) => compose(rest, bReceiver(Some(b)))
case Halt() => compose(Halt[A,B](), bReceiver(None))
case Await(aReceiver) =>
Await[A,C] { a => compose(aReceiver(a), p2) }
}
case Emit(c, rest) => Emit(c, compose(p1, rest))
case Halt() => Halt()
}
}
def nest[A,B,C](p: Process[A,B])(f: B => Process[A,C]): Process[A,C] =
p match {
case Halt() => Halt()
case Emit(b, rest) => concat(f(b), nest(rest)(f))
case Await(aReceiver) => Await[A,C] { a => nest(aReceiver(a))(f) }
}
def map[A,B,C](p: Process[A,B])(f: B => C): Process[A,C] =
p match {
case Halt() => Halt()
case Emit(b, rest) => Emit(f(b), map(rest)(f))
case Await(aReceiver) =>
Await{ a => map(aReceiver(a))(f) }
}
def addCount[A]: Process[A, (A,Long)] =
loop[A,Long,(A,Long)](0L) { (a,count) => ((a,count), count+1) }
def zipWithIndex[A,B](p: Process[A,B]): Process[A,(B,Long)] =
compose(p, addCount)
def feed[A,B](oa: Option[A])(p: Process[A,B]): Process[A,B] =
p match {
case Halt() => Halt()
case Emit(b, rest) => Emit(b, feed(oa)(rest))
case Await(aReceiver) => aReceiver(oa)
}
def zip[A,B,C](pb: Process[A,B], pc: Process[A,C]): Process[A,(B,C)] =
(pb, pc) match {
case (Halt(), _) => Halt()
case (_, Halt()) => Halt()
case (Emit(b,npB), Emit(c, npC)) => Emit((b,c), zip(npB, npC))
case (Await(aToPB), _) => Await[A,(B,C)] { a => zip(aToPB(a), feed(a)(pc)) }
case (_, Await(aToPC)) => Await[A,(B,C)] { a => zip(feed(a)(pb), aToPC(a)) }
}
def exists[A](f: A => Boolean): Process[A,Boolean] =
fromFn1(f) |> foldLeft(false)(_ || _)
private class ProcessedIterator[A,B](val as: Iterator[A], var p: Process[A,B])
extends Iterator[B] {
var _next: Option[B] = None
@annotation.tailrec
final def hasNext: Boolean = {
if (_next != None) true
else p match {
case Halt() => false
case Emit(b, nextSteps) =>
_next = Some(b)
p = nextSteps
true
case Await(aReceiver) =>
val optNextA = if (as.hasNext) Some(as.next) else None
p = aReceiver(optNextA)
hasNext
}
}
final def next: B =
if (hasNext) {
val b = _next.get
_next = None
b
} else throw new java.util.NoSuchElementException("next on empty iterator")
}
def iterator[A,B](p: Process[A,B])(xs: Iterator[A]): Iterator[B] =
new ProcessedIterator[A,B](xs, p)
def fileProcessor[A,B]
(p: Process[String,A])
(f: TraversableOnce[A] => B): java.io.File => B = {
file =>
val s = io.Source.fromFile(file)
try f(iterator(p)(s.getLines))
finally s.close
}
import collection.generic.CanBuildFrom
def scColl[A,B, F[X] <: Iterable[X], That](
xs: F[A])(p: Process[A,B])(implicit cbf: CanBuildFrom[F[A],B,That]): That = {
val bIter: Iterator[B] = iterator(p)(xs.iterator)
val builder = cbf.apply(xs)
while (bIter.hasNext) { builder += bIter.next }
builder.result
}
}
}
object Transducers {
trait Transducer[A,B] {
def apply[R](add: (R,B)=>R): (R,A)=>R
}
def map[A,B](f: A => B): Transducer[A,B] =
new Transducer[A,B] {
def apply[R](add: (R,B)=>R): (R,A)=>R =
(r,a) => add(r, f(a))
}
def filter[A](f: A => Boolean): Transducer[A,A] =
new Transducer[A,A] {
def apply[R](add: (R,A)=>R): (R,A)=>R =
(r,a) => { if (f(a)) add(r, a) else r }
}
def flatten[A]: Transducer[TraversableOnce[A],A] =
new Transducer[TraversableOnce[A],A] {
def apply[R](add: (R,A)=>R): (R,TraversableOnce[A])=>R =
(r, as) => as.foldLeft(r)(add)
}
object Transducer {
def concat[A,B,C](
t1: Transducer[A,B],
t2: Transducer[B,C]): Transducer[A,C] =
new Transducer[A,C] {
def apply[R](add: (R,C)=>R): (R,A)=>R = t1(t2(add))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment