Last active
March 28, 2016 16:29
-
-
Save mayonesa/a0f808c6c6f585a37155 to your computer and use it in GitHub Desktop.
fork/join -> new cached thread pool. removed commented code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package jj.async | |
import scala.concurrent.{ ExecutionContext, Future } | |
import java.util.concurrent.Executors.newCachedThreadPool | |
import rx.lang.scala.Observable | |
import ExecutionContext.Implicits.global | |
import jj.async.helpers._ | |
/* Problem: How to multi-process records asynchronously in chunks. | |
Processing steps: | |
- fetch finite # of records from a repository (10 at-a-time (<= 10 for last batch) because of downstream limitations) | |
- process ea. chunk through a filter asynchronously (has 10-record input limit) | |
- compute the reverse of the filtered result | |
- return reverse-filtered results once all records are processed | |
*/ | |
class AsyncChunkProcessor(repo: Repository) { | |
implicit val ec = ExecutionContext fromExecutor newCachedThreadPool | |
// return set as dupes are undesirable | |
def process: Set[Int] = | |
oddsObs.foldLeft(Set[Int]()) { | |
_ ++ _ | |
}.toBlocking single | |
private def oddsObs = | |
for { | |
chunk <- repo.query(chunkSize) | |
evens <- Observable from evens(chunk) | |
} yield chunk -- evens // models unlicensed | |
private def evens(chunk: Set[Int]) = | |
Future { | |
Remote.even(chunk) | |
} | |
} | |
object AsyncChunkProcessor extends App { | |
def apply(repo: Repository) = new AsyncChunkProcessor(repo) | |
println("computing...") | |
val t0 = System.currentTimeMillis() | |
println("output: " + apply(Repository()).process) | |
println("in " + (System.currentTimeMillis() - t0) / 1000 + " secs") | |
} | |
import rx.lang.scala.Subscriber | |
class Repository { | |
def query(fetchSize: Int = 10): Observable[Set[Int]] = { | |
Observable[Set[Int]] { subscriber => | |
def nextFetchSignal(start: Int, end: Int) = | |
// not optimized for unsubscription as they're not expected | |
// but included for completeness | |
for (i <- start to end by fetchSize) { | |
if (!subscriber.isUnsubscribed) { | |
subscriber.onNext(DataSource.fetch(i)) | |
} | |
} | |
Future { | |
Thread.sleep(20000) | |
nextFetchSignal(1, 81) | |
nextFetchSignal(68, 1901) | |
if (!subscriber.isUnsubscribed) { | |
subscriber.onCompleted() | |
} | |
} | |
} | |
} | |
} | |
object Repository { | |
def apply() = new Repository | |
} | |
object DataSource { | |
def fetch(begin: Int, fetchSize: Int = 10) = { | |
val end = begin + fetchSize | |
Thread.sleep(100) | |
(for { | |
i <- begin until end | |
} yield i) toSet | |
} | |
} | |
package object helpers { | |
val chunkSize = 10 | |
def checkSizeLimit(set: Set[Int]) = | |
if (set.size > chunkSize) throw new IllegalArgumentException(s"$chunkSize-element limit violated: ${set.size}") | |
} | |
// unmodifiable | |
object Remote { | |
// models licensed | |
def even = { xs: Set[Int] => | |
checkSizeLimit(xs) | |
Thread.sleep(3000) | |
xs filter { _ % 2 == 0 } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment