Skip to content

Instantly share code, notes, and snippets.

@bxiang
Last active February 26, 2022 13:54
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bxiang/fbc1bb9529a7bffdc916ce36fb78aaad to your computer and use it in GitHub Desktop.
Save bxiang/fbc1bb9529a7bffdc916ce36fb78aaad to your computer and use it in GitHub Desktop.
LogStreamApp
import java.nio.file.{Files, Paths}
import zio._
import zio.console._
import zio.duration._
import zio.stream._
object LogStreamApp extends App {
def isErrorWarning(data: String) = {
data.contains("ERROR") || data.contains("WARN")
}
def isError(data: String): Boolean = {
data.contains("ERROR")
}
def processError(data: String) = {
putStrLn(s"process error message: ${data}") *>
Task.succeed()
}
def processWarning(list: List[String]) = {
putStrLn(s"process warning messages in batch: ${list.length} => $list") *>
Task.succeed()
}
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
val is = Files.newInputStream(Paths.get("C:/temp/data/prod_log.txt"))
val theJob = (for {
streams <- ZStream
.fromInputStream(is)
.chunks
.aggregate(ZSink.utf8DecodeChunk)
.aggregate(ZSink.splitLines)
.mapConcatChunk(identity)
.tap(data => putStrLn(s"> $data"))
.filter(isErrorWarning)
.partition(isError, 4)
} yield streams).use {
case (leftStream, rightStream) => {
val errorStream = leftStream
.mapM(processError(_))
.schedule(Schedule.fixed(2.second))
val warningStream = rightStream
.aggregate(ZSink.collectAllN[String](10))
.mapM(processWarning(_))
errorStream.merge(warningStream).runCollect
}
}
theJob.fold(_ => 1, _ => 0)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment