Infinite Batch Processing Walking Skeleton using scalaz-stream
import java.util.concurrent.Executors | |
import scalaz.concurrent.{Strategy, Task} | |
import scalaz.stream._ | |
import scalaz.stream.async.mutable.Queue | |
object MercuryPlayabilityUpdater { | |
implicit val DefaultStrategy: Strategy = Strategy.Executor( | |
Executors.newFixedThreadPool(10, Strategy.DefaultDaemonThreadFactory) | |
) | |
val largeInputBuffer: Queue[String] = async.boundedQueue[String](1000, false) | |
val largeInput: Process[Task,String] = Process.emitAll(0 until 2000).map(_.toString()) | |
val chunkedLargeInput: Process[Task,Vector[String]] = largeInput.chunk(500) | |
val largeInputWithDelay: Process[Task, Unit] = chunkedLargeInput.flatMap(grp => { | |
Process.eval(delay(5000, Option("\nLarge Input Requested\n"), Option("\nLarge Input Received\n")).flatMap(_ => largeInputBuffer.enqueueAll(grp))) | |
}) | |
def simulateDataProcessing(data: String):Process[Task,String] = { | |
Process.eval(delay(100, Option(".")).map(_ => data)) | |
} | |
val drainBufferAndProcess = largeInputBuffer.dequeue.map(simulateDataProcessing(_)) | |
val dataProcessingPool = nondeterminism.njoin(maxOpen = 10, maxQueued = 100)(drainBufferAndProcess) | |
val chunkedLargeOutputWithDelay: Process[Task, String] = dataProcessingPool.chunk(100).flatMap(vecChunk => { | |
Process.eval(delay(10).map(_ => " "+vecChunk.size + " Items completed in Batch\n")) | |
}) | |
val outputSink: Sink[Task, String] = io.print(System.out) | |
val pipeline = wye(chunkedLargeOutputWithDelay.to(outputSink), largeInputWithDelay)(wye.mergeHaltBoth) | |
def delay(millis: Long, printStarted: Option[String] = None, printDone: Option[String] = None):Task[Unit] = Task.delay { | |
printStarted.foreach(print(_)) | |
Thread.sleep(millis) | |
printDone.foreach(print(_)) | |
() | |
} | |
def run():Unit = { | |
val s = System.nanoTime | |
pipeline.run.run | |
println("time: "+(System.nanoTime-s)/1e6+"ms") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment