Skip to content

Instantly share code, notes, and snippets.

@kiambogo
Last active July 10, 2019 04:36
Show Gist options
  • Save kiambogo/b696fe441946df49895bba07cfb2dda5 to your computer and use it in GitHub Desktop.
Save kiambogo/b696fe441946df49895bba07cfb2dda5 to your computer and use it in GitHub Desktop.
fs2 ErrorLoggingStream
import fs2.{Stream, Pipe}
import cats.effect.{Sync, IO}
trait Logger[F[_]] {
def infoF(msg: String)(implicit F: Sync[F]): F[Unit]
def info(msg: String): Unit
def errorF(msg: String, e: Throwable)(implicit F: Sync[F]): F[Unit]
}
class ConsoleLogger[F[_]] extends Logger[F] {
def infoF(msg: String)(implicit F: Sync[F]) = F.delay(println(msg))
def info(msg: String) = println(msg)
def errorF(msg: String, e: Throwable)(implicit F: Sync[F]) = F.delay(println(s"$msg. @@ Exception:$e"))
}
case class ErrorLoggingStream[F[_], A](stream: fs2.Stream[F, A])(implicit F: Sync[F]) {
def logAndSkipErrors[B](msg: B => String)(implicit log: Logger[F], ev: A =:= Either[(Throwable, B), B]): fs2.Stream[F, B] = {
stream.asInstanceOf[fs2.Stream[F, Either[(Throwable, B), B]]].flatMap {
case Right(r) => fs2.Stream.eval(F.delay(r))
case Left((e, i)) => fs2.Stream.eval_(log.errorF(msg(i), e))
}
}
}
implicit def stream2ErrorLoggingStream[F[_], A](s: fs2.Stream[F, A])(implicit F: Sync[F]) = ErrorLoggingStream(s)
def writeToDb[F[_]](implicit log: Logger[F], F: Sync[F]): fs2.Pipe[F, Int, Either[(Throwable, Int), Int]] = _.evalMap { i =>
F.delay{
Thread.sleep(200) // Simulate call to db
if (i == 5) Left((new Exception("Connection to DB dropped"), i))
else {
log.info(s"saved $i to the db")
Right(i)
}
}
}
def writeToKinesis[F[_]](implicit log: Logger[F], F: Sync[F]): fs2.Pipe[F, Int, Either[(Throwable, Int), Int]] = _.evalMap { i =>
F.delay{
Thread.sleep(200) // Simulate call to Kinesis
if (i == 7) Left((new Exception("Write to Kinesis failed"), i))
else {
log.info(s"saved $i to kinesis");
Right(i)
}
}
}
implicit val log = new ConsoleLogger[IO]
{val stream =
fs2.Stream(1,2,3,4,5,6,7,8,9)
.covary[IO]
.through(writeToDb)
.logAndSkipErrors[Int]{i => s"Error writing $i to the database"}
.through(writeToKinesis)
.logAndSkipErrors[Int]{i => s"Error writing $i to the kinesis"}
}
stream.compile.toVector.unsafeRunSync
/* Output:
saved 1 to the db
saved 1 to kinesis
saved 2 to the db
saved 2 to kinesis
saved 3 to the db
saved 3 to kinesis
saved 4 to the db
saved 4 to kinesis
Error writing 5 to the database. @@ Exception:java.lang.Exception: Connection to DB dropped
saved 6 to the db
saved 6 to kinesis
saved 7 to the db
Error writing 7 to the kinesis. @@ Exception:java.lang.Exception: Write to Kinesis failed
saved 8 to the db
saved 8 to kinesis
saved 9 to the db
saved 9 to kinesis
Vector[Int] = Vector(1, 2, 3, 4, 6, 8, 9)
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment