Created
July 8, 2015 12:12
-
-
Save gauthamnair/d98a1906f8219460ce3b to your computer and use it in GitHub Desktop.
fromFPinscala + transducers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package code | |
object Process0 { | |
sealed trait Process[I, O] { | |
import Process._ | |
def |>[O2](p2: Process[O,O2]): Process[I,O2] = compose(this, p2) | |
def ++(p2: Process[I,O]): Process[I,O] = concat(this, p2) | |
def flatMap[O2](f: O => Process[I,O2]): Process[I,O2] = nest(this)(f) | |
} | |
case class Emit[I,O]( | |
head: O, | |
tail: Process[I,O] = Halt[I,O]()) | |
extends Process[I,O] | |
case class Await[I,O]( | |
recv: Option[I] => Process[I, O]) | |
extends Process[I,O] | |
case class Halt[I,O]() extends Process[I,O] | |
object Process { | |
def stream[I,O](proc: Process[I,O])(xs: Stream[I]): Stream[O] = | |
proc match { | |
case Emit(out, nextSteps) => out #:: stream(nextSteps)(xs) | |
case Await(receiver) => xs match { | |
case in #:: nextItems => stream(receiver(Some(in)))(nextItems) | |
case emptyInput => stream(receiver(None))(emptyInput) | |
} | |
case Halt() => Stream.empty | |
} | |
def withHaltDefault[I,O](f: I => Process[I,O]): Option[I] => Process[I,O] = | |
_.map(f).getOrElse(Halt[I,O]()) | |
def awaitWithHaltOnEmpty[I,O](f: I => Process[I,O]): Process[I,O] = | |
Await[I,O](withHaltDefault(f)) | |
def oneInputWithFn1[I,O](f: I => O): Process[I,O] = | |
awaitWithHaltOnEmpty { input => Emit[I,O](f(input), Halt()) } | |
def fromFn1[I,O](f: I => O): Process[I,O] = | |
awaitWithHaltOnEmpty { input => Emit[I,O](f(input), fromFn1(f)) } | |
def concat[I,O](p1: Process[I,O], p2: Process[I,O]): Process[I,O] = | |
p1 match { | |
case Emit(h, rest) => Emit(h, concat(rest, p2)) | |
case Await(recv) => Await { input => concat(recv(input), p2) } | |
case Halt() => p2 | |
} | |
def continue[I,O]( | |
p: Process[I,O], | |
onComplete: => Process[I,O], | |
onNoInput: => Process[I,O]): Process[I,O] = | |
p match { | |
case Emit(h, rest) => Emit(h, continue(rest, onComplete, onNoInput)) | |
case Await(recv) => Await { | |
case Some(input) => continue(recv(Some(input)), onComplete, onNoInput) | |
case None => onNoInput | |
} | |
case Halt() => onComplete | |
} | |
def repeat[I,O](proc: Process[I,O]): Process[I,O] = | |
continue(proc, onComplete=repeat(proc), onNoInput=Halt()) | |
def filter[I](satisfies: I => Boolean): Process[I,I] = | |
awaitWithHaltOnEmpty { input => | |
if (satisfies(input)) Emit(input, filter(satisfies)) | |
else Halt() | |
} | |
def identityProcess[I]: Process[I,I] = fromFn1(identity) | |
def take[I](n: Int): Process[I,I] = | |
if (n <= 0) Halt() | |
else awaitWithHaltOnEmpty { i => Emit(i, take(n-1)) } | |
def drop[I](n: Int): Process[I,I] = | |
if (n <= 0) identityProcess | |
else awaitWithHaltOnEmpty { _ => drop(n - 1) } | |
def takeWhile[I](f: I => Boolean): Process[I,I] = | |
awaitWithHaltOnEmpty { input => | |
if (f(input)) Emit(input, takeWhile(f)) | |
else Halt() | |
} | |
def dropWhile[I](f: I => Boolean): Process[I,I] = | |
awaitWithHaltOnEmpty { input => | |
if (f(input)) dropWhile(f) | |
else identityProcess | |
} | |
def loop[I,S,O](z: S)(f: (I,S) => (O,S)): Process[I,O] = | |
awaitWithHaltOnEmpty { input => | |
val (output, nextState) = f(input, z) | |
Emit(output, loop(nextState)(f)) | |
} | |
def foldLeft[A,B](z: B)(f: (B,A) => B): Process[A,B] = | |
awaitWithHaltOnEmpty { a => | |
val accumulated = f(z,a) | |
Emit(accumulated, tail=foldLeft(accumulated)(f)) | |
} | |
def count[I]: Process[I,Long] = foldLeft(0L)((c,_) => c + 1L) | |
def sum: Process[Double, Double] = foldLeft(0.0)(_ + _) | |
def mean: Process[Double, Double] = | |
loop((0.0, 0L)) { | |
(input: Double, state: (Double, Long)) => | |
val (prevTotal, prevCount) = state | |
val newTotal = prevTotal + input | |
val newCount = prevCount + 1L | |
(newTotal/newCount, (newTotal, newCount)) | |
} | |
def compose[A,B,C](p1: Process[A,B], p2: Process[B,C]): Process[A,C] = { | |
p2 match { | |
case Await(bReceiver) => p1 match { | |
case Emit(b, rest) => compose(rest, bReceiver(Some(b))) | |
case Halt() => compose(Halt[A,B](), bReceiver(None)) | |
case Await(aReceiver) => | |
Await[A,C] { a => compose(aReceiver(a), p2) } | |
} | |
case Emit(c, rest) => Emit(c, compose(p1, rest)) | |
case Halt() => Halt() | |
} | |
} | |
def nest[A,B,C](p: Process[A,B])(f: B => Process[A,C]): Process[A,C] = | |
p match { | |
case Halt() => Halt() | |
case Emit(b, rest) => concat(f(b), nest(rest)(f)) | |
case Await(aReceiver) => Await[A,C] { a => nest(aReceiver(a))(f) } | |
} | |
def map[A,B,C](p: Process[A,B])(f: B => C): Process[A,C] = | |
p match { | |
case Halt() => Halt() | |
case Emit(b, rest) => Emit(f(b), map(rest)(f)) | |
case Await(aReceiver) => | |
Await{ a => map(aReceiver(a))(f) } | |
} | |
def addCount[A]: Process[A, (A,Long)] = | |
loop[A,Long,(A,Long)](0L) { (a,count) => ((a,count), count+1) } | |
def zipWithIndex[A,B](p: Process[A,B]): Process[A,(B,Long)] = | |
compose(p, addCount) | |
def feed[A,B](oa: Option[A])(p: Process[A,B]): Process[A,B] = | |
p match { | |
case Halt() => Halt() | |
case Emit(b, rest) => Emit(b, feed(oa)(rest)) | |
case Await(aReceiver) => aReceiver(oa) | |
} | |
def zip[A,B,C](pb: Process[A,B], pc: Process[A,C]): Process[A,(B,C)] = | |
(pb, pc) match { | |
case (Halt(), _) => Halt() | |
case (_, Halt()) => Halt() | |
case (Emit(b,npB), Emit(c, npC)) => Emit((b,c), zip(npB, npC)) | |
case (Await(aToPB), _) => Await[A,(B,C)] { a => zip(aToPB(a), feed(a)(pc)) } | |
case (_, Await(aToPC)) => Await[A,(B,C)] { a => zip(feed(a)(pb), aToPC(a)) } | |
} | |
def exists[A](f: A => Boolean): Process[A,Boolean] = | |
fromFn1(f) |> foldLeft(false)(_ || _) | |
private class ProcessedIterator[A,B](val as: Iterator[A], var p: Process[A,B]) | |
extends Iterator[B] { | |
var _next: Option[B] = None | |
@annotation.tailrec | |
final def hasNext: Boolean = { | |
if (_next != None) true | |
else p match { | |
case Halt() => false | |
case Emit(b, nextSteps) => | |
_next = Some(b) | |
p = nextSteps | |
true | |
case Await(aReceiver) => | |
val optNextA = if (as.hasNext) Some(as.next) else None | |
p = aReceiver(optNextA) | |
hasNext | |
} | |
} | |
final def next: B = | |
if (hasNext) { | |
val b = _next.get | |
_next = None | |
b | |
} else throw new java.util.NoSuchElementException("next on empty iterator") | |
} | |
def iterator[A,B](p: Process[A,B])(xs: Iterator[A]): Iterator[B] = | |
new ProcessedIterator[A,B](xs, p) | |
def fileProcessor[A,B] | |
(p: Process[String,A]) | |
(f: TraversableOnce[A] => B): java.io.File => B = { | |
file => | |
val s = io.Source.fromFile(file) | |
try f(iterator(p)(s.getLines)) | |
finally s.close | |
} | |
import collection.generic.CanBuildFrom | |
def scColl[A,B, F[X] <: Iterable[X], That]( | |
xs: F[A])(p: Process[A,B])(implicit cbf: CanBuildFrom[F[A],B,That]): That = { | |
val bIter: Iterator[B] = iterator(p)(xs.iterator) | |
val builder = cbf.apply(xs) | |
while (bIter.hasNext) { builder += bIter.next } | |
builder.result | |
} | |
} | |
} | |
object Transducers { | |
trait Transducer[A,B] { | |
def apply[R](add: (R,B)=>R): (R,A)=>R | |
} | |
def map[A,B](f: A => B): Transducer[A,B] = | |
new Transducer[A,B] { | |
def apply[R](add: (R,B)=>R): (R,A)=>R = | |
(r,a) => add(r, f(a)) | |
} | |
def filter[A](f: A => Boolean): Transducer[A,A] = | |
new Transducer[A,A] { | |
def apply[R](add: (R,A)=>R): (R,A)=>R = | |
(r,a) => { if (f(a)) add(r, a) else r } | |
} | |
def flatten[A]: Transducer[TraversableOnce[A],A] = | |
new Transducer[TraversableOnce[A],A] { | |
def apply[R](add: (R,A)=>R): (R,TraversableOnce[A])=>R = | |
(r, as) => as.foldLeft(r)(add) | |
} | |
object Transducer { | |
def concat[A,B,C]( | |
t1: Transducer[A,B], | |
t2: Transducer[B,C]): Transducer[A,C] = | |
new Transducer[A,C] { | |
def apply[R](add: (R,C)=>R): (R,A)=>R = t1(t2(add)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment