Skip to content

Instantly share code, notes, and snippets.

@mayonesa
Last active March 28, 2016 16:29
Show Gist options
  • Save mayonesa/a0f808c6c6f585a37155 to your computer and use it in GitHub Desktop.
Save mayonesa/a0f808c6c6f585a37155 to your computer and use it in GitHub Desktop.
fork/join -> new cached thread pool. removed commented code
package jj.async
import scala.concurrent.{ ExecutionContext, Future }
import java.util.concurrent.Executors.newCachedThreadPool
import rx.lang.scala.Observable
import ExecutionContext.Implicits.global
import jj.async.helpers._
/* Problem: How to multi-process records asynchronously in chunks.
Processing steps:
- fetch finite # of records from a repository (10 at-a-time (<= 10 for last batch) because of downstream limitations)
- process ea. chunk through a filter asynchronously (has 10-record input limit)
- compute the reverse of the filtered result
- return reverse-filtered results once all records are processed
*/
class AsyncChunkProcessor(repo: Repository) {
implicit val ec = ExecutionContext fromExecutor newCachedThreadPool
// return set as dupes are undesirable
def process: Set[Int] =
oddsObs.foldLeft(Set[Int]()) {
_ ++ _
}.toBlocking single
private def oddsObs =
for {
chunk <- repo.query(chunkSize)
evens <- Observable from evens(chunk)
} yield chunk -- evens // models unlicensed
private def evens(chunk: Set[Int]) =
Future {
Remote.even(chunk)
}
}
object AsyncChunkProcessor extends App {
def apply(repo: Repository) = new AsyncChunkProcessor(repo)
println("computing...")
val t0 = System.currentTimeMillis()
println("output: " + apply(Repository()).process)
println("in " + (System.currentTimeMillis() - t0) / 1000 + " secs")
}
import rx.lang.scala.Subscriber
class Repository {
def query(fetchSize: Int = 10): Observable[Set[Int]] = {
Observable[Set[Int]] { subscriber =>
def nextFetchSignal(start: Int, end: Int) =
// not optimized for unsubscription as they're not expected
// but included for completeness
for (i <- start to end by fetchSize) {
if (!subscriber.isUnsubscribed) {
subscriber.onNext(DataSource.fetch(i))
}
}
Future {
Thread.sleep(20000)
nextFetchSignal(1, 81)
nextFetchSignal(68, 1901)
if (!subscriber.isUnsubscribed) {
subscriber.onCompleted()
}
}
}
}
}
object Repository {
def apply() = new Repository
}
object DataSource {
def fetch(begin: Int, fetchSize: Int = 10) = {
val end = begin + fetchSize
Thread.sleep(100)
(for {
i <- begin until end
} yield i) toSet
}
}
package object helpers {
val chunkSize = 10
def checkSizeLimit(set: Set[Int]) =
if (set.size > chunkSize) throw new IllegalArgumentException(s"$chunkSize-element limit violated: ${set.size}")
}
// unmodifiable
object Remote {
// models licensed
def even = { xs: Set[Int] =>
checkSizeLimit(xs)
Thread.sleep(3000)
xs filter { _ % 2 == 0 }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment