Instantly share code, notes, and snippets.

Embed
What would you like to do?
package pellucid.data.util
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import org.joda.time.format.DateTimeFormat
import scala.util.Random
import scalaz.concurrent.Task
import scalaz.stream.io.channel
import scalaz.stream.{Channel, merge, Process}
object RunConcurrently extends App {
val P = scalaz.stream.Process
implicit class ConcurrentProcess[O](val process: Process[Task, O]) {
/**
* Run process through channel with given level of concurrency
*/
def concurrently[O2](concurrencyLevel: Int)
(f: Channel[Task, O, O2]): Process[Task, O2] = {
val actions =
process.
zipWith(f)((data, f) => f(data))
val nestedActions =
actions.map(P.eval)
merge.mergeN(concurrencyLevel)(nestedActions)
}
}
val timeFormat = DateTimeFormat.forPattern("HH:mm:ss:SSS")
val counter = new AtomicInteger(0)
// ThreadPool for running effectful functions
val executor = Executors.newFixedThreadPool(3)
// channel of effectful functions
val effectfulChannel = channel[Int, Int] {
in => Task {
val taskN = counter.incrementAndGet()
println(s"${Thread.currentThread().getName}: " +
s"Run for $in, " +
s"TaskN = $taskN " +
s"(time = ${timeFormat.print(System.currentTimeMillis())})")
// Long running computation
val computed = {
Thread.sleep(1000)
in * in
}
computed
}(executor)
}
val start = System.currentTimeMillis()
val output = Process.range(1, 11).concurrently(5)(effectfulChannel).runLog.run
val end = System.currentTimeMillis()
println(s"Output = $output, in ${end-start} ms")
}
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import org.joda.time.format.DateTimeFormat
import scala.util.Random
import scalaz.concurrent.Task
import scalaz.stream.io.channel
import scalaz.stream.{Channel, merge, Process}
object RunDeterministic extends App {
val timeFormat = DateTimeFormat.forPattern("HH:mm:ss:SSS")
val counter = new AtomicInteger(0)
// ThreadPool for running effectful functions
val executor = Executors.newFixedThreadPool(3)
// channel of effectful functions
val effectfulChannel = channel[Int, Int] {
in => Task {
val taskN = counter.incrementAndGet()
println(s"${Thread.currentThread().getName}: " +
s"Run for $in, " +
s"TaskN = $taskN " +
s"(time = ${timeFormat.print(System.currentTimeMillis())})")
// Long running computation
val computed = {
Thread.sleep(1000)
in * in
}
computed
}(executor)
}
val start = System.currentTimeMillis()
val output = Process.range(1, 11).through(effectfulChannel).runLog.run
val end = System.currentTimeMillis()
println(s"Output = $output, in ${end-start} ms")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment