Skip to content

Instantly share code, notes, and snippets.

@yilinwei
Created February 22, 2017 18:02
Show Gist options
  • Save yilinwei/851ab5c3f9500c510d892a0d0a161ec2 to your computer and use it in GitHub Desktop.
Save yilinwei/851ab5c3f9500c510d892a0d0a161ec2 to your computer and use it in GitHub Desktop.
//Say we have a function which returns a custom error
def process(line: String, max: Int): Try[Int] = {
if(line.length < max) Success(line.toInt) else Failure(new IllegalArgumentException(s"$line is longer than $max"))
}
//and we want to process a stream of them, but stop when there is an error
def pipe[F[_]](max: Int)(implicit F: Async[F]): Pipe[F, String, Int] = {
stream => stream.flatMap { line =>
process(line, max) match {
case Success(value) => Stream.pure(value)
case Failure(e) => Stream.fail(e)
}
}
}
val myPipe = pipe(3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment