Last active
February 26, 2022 13:54
-
-
Save bxiang/fbc1bb9529a7bffdc916ce36fb78aaad to your computer and use it in GitHub Desktop.
LogStreamApp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file.{Files, Paths} | |
import zio._ | |
import zio.console._ | |
import zio.duration._ | |
import zio.stream._ | |
object LogStreamApp extends App { | |
def isErrorWarning(data: String) = { | |
data.contains("ERROR") || data.contains("WARN") | |
} | |
def isError(data: String): Boolean = { | |
data.contains("ERROR") | |
} | |
def processError(data: String) = { | |
putStrLn(s"process error message: ${data}") *> | |
Task.succeed() | |
} | |
def processWarning(list: List[String]) = { | |
putStrLn(s"process warning messages in batch: ${list.length} => $list") *> | |
Task.succeed() | |
} | |
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = { | |
val is = Files.newInputStream(Paths.get("C:/temp/data/prod_log.txt")) | |
val theJob = (for { | |
streams <- ZStream | |
.fromInputStream(is) | |
.chunks | |
.aggregate(ZSink.utf8DecodeChunk) | |
.aggregate(ZSink.splitLines) | |
.mapConcatChunk(identity) | |
.tap(data => putStrLn(s"> $data")) | |
.filter(isErrorWarning) | |
.partition(isError, 4) | |
} yield streams).use { | |
case (leftStream, rightStream) => { | |
val errorStream = leftStream | |
.mapM(processError(_)) | |
.schedule(Schedule.fixed(2.second)) | |
val warningStream = rightStream | |
.aggregate(ZSink.collectAllN[String](10)) | |
.mapM(processWarning(_)) | |
errorStream.merge(warningStream).runCollect | |
} | |
} | |
theJob.fold(_ => 1, _ => 0) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment