Skip to content

Instantly share code, notes, and snippets.

@dchenbecker
Created March 18, 2015 18:38
Show Gist options
  • Save dchenbecker/7ea5c392681e35ee7355 to your computer and use it in GitHub Desktop.
Save dchenbecker/7ea5c392681e35ee7355 to your computer and use it in GitHub Desktop.
Making Process aggregation work
import scalaz.{ \/, -\/, \/-, \&/, Monoid }
import \&/._
import scalaz.concurrent.Task
import scalaz.syntax.semigroup._
import scalaz.stream.{ Process, Process1 }
import com.typesafe.scalalogging.Logger
object StreamReporting {
def aggregateProcess[A: Monoid, B](classifier: B => A)(handleAggregate: A => Task[Unit]): Process1[B, B] = {
implicit val MA = implicitly[Monoid[A]]
def go(count: A): Process1[B, B] =
Process.receive1Or[B, B] {
// End of stream, so we attempt to handle and continue
handleAggregate(count).attempt.run match {
case -\/(cause) => Process.fail(cause)
case _ => Process.halt
}
} { b: B => Process.emit(b) ++ go(count |+| classifier(b)) }
go(MA.zero)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment