Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package pellucid.data.util
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import org.joda.time.format.DateTimeFormat
import scala.util.Random
import scalaz.concurrent.Task
import scalaz.stream.io.channel
import scalaz.stream.{Channel, merge, Process}
object RunConcurrently extends App {
val P = scalaz.stream.Process
implicit class ConcurrentProcess[O](val process: Process[Task, O]) {
/**
* Run process through channel with given level of concurrency
*/
def concurrently[O2](concurrencyLevel: Int)
(f: Channel[Task, O, O2]): Process[Task, O2] = {
val actions =
process.
zipWith(f)((data, f) => f(data))
val nestedActions =
actions.map(P.eval)
merge.mergeN(concurrencyLevel)(nestedActions)
}
}
val timeFormat = DateTimeFormat.forPattern("HH:mm:ss:SSS")
val counter = new AtomicInteger(0)
// ThreadPool for running effectful functions
val executor = Executors.newFixedThreadPool(3)
// channel of effectful functions
val effectfulChannel = channel[Int, Int] {
in => Task {
val taskN = counter.incrementAndGet()
println(s"${Thread.currentThread().getName}: " +
s"Run for $in, " +
s"TaskN = $taskN " +
s"(time = ${timeFormat.print(System.currentTimeMillis())})")
// Long running computation
val computed = {
Thread.sleep(1000)
in * in
}
computed
}(executor)
}
val start = System.currentTimeMillis()
val output = Process.range(1, 11).concurrently(5)(effectfulChannel).runLog.run
val end = System.currentTimeMillis()
println(s"Output = $output, in ${end-start} ms")
}
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import org.joda.time.format.DateTimeFormat
import scala.util.Random
import scalaz.concurrent.Task
import scalaz.stream.io.channel
import scalaz.stream.{Channel, merge, Process}
object RunDeterministic extends App {
val timeFormat = DateTimeFormat.forPattern("HH:mm:ss:SSS")
val counter = new AtomicInteger(0)
// ThreadPool for running effectful functions
val executor = Executors.newFixedThreadPool(3)
// channel of effectful functions
val effectfulChannel = channel[Int, Int] {
in => Task {
val taskN = counter.incrementAndGet()
println(s"${Thread.currentThread().getName}: " +
s"Run for $in, " +
s"TaskN = $taskN " +
s"(time = ${timeFormat.print(System.currentTimeMillis())})")
// Long running computation
val computed = {
Thread.sleep(1000)
in * in
}
computed
}(executor)
}
val start = System.currentTimeMillis()
val output = Process.range(1, 11).through(effectfulChannel).runLog.run
val end = System.currentTimeMillis()
println(s"Output = $output, in ${end-start} ms")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.