Skip to content

Instantly share code, notes, and snippets.

@ezhulenev
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ezhulenev/9471434 to your computer and use it in GitHub Desktop.
Save ezhulenev/9471434 to your computer and use it in GitHub Desktop.
import com.google.common.util.concurrent.ThreadFactoryBuilder
import java.io.ByteArrayInputStream
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import scala.util.Random
import scalaz.concurrent.Task
object GatherApp extends App {
lazy val executor = {
val threadFactory = new ThreadFactoryBuilder().setNameFormat("gather-pool-%d").setDaemon(true).build()
Executors.newFixedThreadPool(5, threadFactory)
}
val cnt = new AtomicInteger(0)
val random = new Random()
val text =
"""|A
|B
|C
|D
|E
|F
|G
|H""".stripMargin
val process = scalaz.stream.io.linesR(new ByteArrayInputStream(text.toCharArray.map(_.toByte)))
//val process = scalaz.stream.Process.emitSeq[Task, String](Seq("A", "B", "C", "D", "E", "F", "G", "H"))
val printMe: String => Task[Int] = s => Task {
val n = cnt.incrementAndGet()
println(s"${Thread.currentThread().getName} - String = $s, Counter = $n")
Thread.sleep(random.nextInt(300))
n
} (executor) //retry(10.millis :: 20.millis :: 30.millis :: Nil)
val output = process.gatherMap(10)(printMe).runLog.run
println(s"Output = $output")
}
1. Run process with emitSeq
gather-pool-0 - String = A, Counter = 1
gather-pool-3 - String = D, Counter = 4
gather-pool-2 - String = C, Counter = 3
gather-pool-1 - String = B, Counter = 2
gather-pool-4 - String = E, Counter = 5
gather-pool-3 - String = F, Counter = 6
gather-pool-0 - String = G, Counter = 7
gather-pool-4 - String = H, Counter = 8
Output = Vector(1, 4, 5, 2, 3, 6, 7, 8)
2. Run process wiht scalaz.stream.io.linesR -> single item processed multiple times
gather-pool-0 - String = A, Counter = 1
gather-pool-4 - String = E, Counter = 5
gather-pool-2 - String = C, Counter = 3
gather-pool-1 - String = B, Counter = 2
gather-pool-3 - String = D, Counter = 4
gather-pool-2 - String = F, Counter = 6
gather-pool-0 - String = G, Counter = 7
gather-pool-3 - String = H, Counter = 8
gather-pool-0 - String = B, Counter = 9
gather-pool-3 - String = C, Counter = 11
gather-pool-2 - String = D, Counter = 12
gather-pool-1 - String = A, Counter = 10
gather-pool-4 - String = E, Counter = 13
gather-pool-1 - String = F, Counter = 14
gather-pool-0 - String = G, Counter = 15
gather-pool-2 - String = H, Counter = 16
Output = Vector(3, 1, 4, 2, 8, 6, 5, 7, 10, 9, 12, 14, 13, 11, 16, 15)
@ezhulenev
Copy link
Author

If buffer size in gatherMap is smaller then total number of line, only last line processed twice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment