Last active
February 26, 2022 13:54
LogStreamApp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file.{Files, Paths} | |
import zio._ | |
import zio.console._ | |
import zio.duration._ | |
import zio.stream._ | |
object LogStreamApp extends App { | |
def isErrorWarning(data: String) = { | |
data.contains("ERROR") || data.contains("WARN") | |
} | |
def isError(data: String): Boolean = { | |
data.contains("ERROR") | |
} | |
def processError(data: String) = { | |
putStrLn(s"process error message: ${data}") *> | |
Task.succeed() | |
} | |
def processWarning(list: List[String]) = { | |
putStrLn(s"process warning messages in batch: ${list.length} => $list") *> | |
Task.succeed() | |
} | |
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = { | |
val is = Files.newInputStream(Paths.get("C:/temp/data/prod_log.txt")) | |
val theJob = (for { | |
streams <- ZStream | |
.fromInputStream(is) | |
.chunks | |
.aggregate(ZSink.utf8DecodeChunk) | |
.aggregate(ZSink.splitLines) | |
.mapConcatChunk(identity) | |
.tap(data => putStrLn(s"> $data")) | |
.filter(isErrorWarning) | |
.partition(isError, 4) | |
} yield streams).use { | |
case (leftStream, rightStream) => { | |
val errorStream = leftStream | |
.mapM(processError(_)) | |
.schedule(Schedule.fixed(2.second)) | |
val warningStream = rightStream | |
.aggregate(ZSink.collectAllN[String](10)) | |
.mapM(processWarning(_)) | |
errorStream.merge(warningStream).runCollect | |
} | |
} | |
theJob.fold(_ => 1, _ => 0) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment