Skip to content

Instantly share code, notes, and snippets.

@ezhulenev
Created March 11, 2014 01:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ezhulenev/9477874 to your computer and use it in GitHub Desktop.
Save ezhulenev/9477874 to your computer and use it in GitHub Desktop.
import com.google.common.util.concurrent.ThreadFactoryBuilder
import java.io.ByteArrayInputStream
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import scala.util.Random
import scalaz.concurrent.Task
import scalaz.stream._
import scalaz.stream.io._
object ChannelApp extends App {
lazy val executor = {
val threadFactory = new ThreadFactoryBuilder().setNameFormat("gather-pool-%d").setDaemon(true).build()
Executors.newFixedThreadPool(5, threadFactory)
}
val cnt = new AtomicInteger(0)
val random = new Random()
val text =
"""|A
|B
|C
|D
|E
|F
|G
|H""".stripMargin
val process = scalaz.stream.io.linesR(new ByteArrayInputStream(text.toCharArray.map(_.toByte)))
val printMe: Channel[Task, String, Int] = resource(Task.delay(()))(
resource => Task.delay(()) /* nothing to close */) {
resource => Task.delay {
case s => Task {
val n = cnt.incrementAndGet()
println(s"${Thread.currentThread().getName} - String = $s, Counter = $n")
Thread.sleep(random.nextInt(500))
n
}(executor)
}
}
//val output = process.through(printMe).runLog.run
val output = process.connect(printMe)(scalaz.stream.wye.boundedQueue(3)).runLog.run
println(s"Output = $output")
}
@ezhulenev
Copy link
Author

gather-pool-0 - String = A, Counter = 1
gather-pool-1 - String = B, Counter = 2
gather-pool-2 - String = C, Counter = 3
gather-pool-3 - String = D, Counter = 4
gather-pool-4 - String = E, Counter = 5
gather-pool-4 - String = F, Counter = 6
gather-pool-0 - String = F, Counter = 7
gather-pool-4 - String = G, Counter = 8
gather-pool-4 - String = H, Counter = 9
Output = Vector(1, 2, 3, 4, 5, 6, 8, 9)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment