Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@caoilte
Created June 10, 2016 08:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save caoilte/bb0db1eda4a2bf255534848a8b28f950 to your computer and use it in GitHub Desktop.
Save caoilte/bb0db1eda4a2bf255534848a8b28f950 to your computer and use it in GitHub Desktop.
Infinite Batch Processing Walking Skeleton using scalaz-stream
import java.util.concurrent.Executors
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream._
import scalaz.stream.async.mutable.Queue
object MercuryPlayabilityUpdater {
implicit val DefaultStrategy: Strategy = Strategy.Executor(
Executors.newFixedThreadPool(10, Strategy.DefaultDaemonThreadFactory)
)
val largeInputBuffer: Queue[String] = async.boundedQueue[String](1000, false)
val largeInput: Process[Task,String] = Process.emitAll(0 until 2000).map(_.toString())
val chunkedLargeInput: Process[Task,Vector[String]] = largeInput.chunk(500)
val largeInputWithDelay: Process[Task, Unit] = chunkedLargeInput.flatMap(grp => {
Process.eval(delay(5000, Option("\nLarge Input Requested\n"), Option("\nLarge Input Received\n")).flatMap(_ => largeInputBuffer.enqueueAll(grp)))
})
def simulateDataProcessing(data: String):Process[Task,String] = {
Process.eval(delay(100, Option(".")).map(_ => data))
}
val drainBufferAndProcess = largeInputBuffer.dequeue.map(simulateDataProcessing(_))
val dataProcessingPool = nondeterminism.njoin(maxOpen = 10, maxQueued = 100)(drainBufferAndProcess)
val chunkedLargeOutputWithDelay: Process[Task, String] = dataProcessingPool.chunk(100).flatMap(vecChunk => {
Process.eval(delay(10).map(_ => " "+vecChunk.size + " Items completed in Batch\n"))
})
val outputSink: Sink[Task, String] = io.print(System.out)
val pipeline = wye(chunkedLargeOutputWithDelay.to(outputSink), largeInputWithDelay)(wye.mergeHaltBoth)
def delay(millis: Long, printStarted: Option[String] = None, printDone: Option[String] = None):Task[Unit] = Task.delay {
printStarted.foreach(print(_))
Thread.sleep(millis)
printDone.foreach(print(_))
()
}
def run():Unit = {
val s = System.nanoTime
pipeline.run.run
println("time: "+(System.nanoTime-s)/1e6+"ms")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment