Last active
March 28, 2016 16:26
-
-
Save mayonesa/69153463fae6df1c6c58 to your computer and use it in GitHub Desktop.
not unnecessary blocking (fork/join -> new cached-thread pool)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package jj.async | |
import scala.concurrent.{ ExecutionContext, Future, Await } | |
import java.util.concurrent.Executors.newCachedThreadPool | |
import rx.lang.scala.Observable | |
import jj.async.Helpers._ | |
import scala.concurrent.duration._ | |
import jj.async.Enriched._ | |
/* Problem: How to optimize multi-process records asynchronously in chunks. | |
Processing steps: | |
- fetch finite # of records from a repository (10 at-a-time (<= 10 for last batch) because of downstream limitations) | |
- process ea. chunk through a filter asynchronously (has 10-record input limit) | |
- compute the reverse of the filtered result | |
- enrich (also has 10-record input limit (max it out for optimization)) filtered results asynchronously | |
- return enriched filtered results once all records are processed | |
*/ | |
class AsyncDisjointedChunkMultiprocessing(repo: Repository) { | |
implicit val ec = ExecutionContext fromExecutor newCachedThreadPool | |
def process(): Set[Enriched] = | |
Await.result(enrich(oddsObs).toBlocking single, 7 seconds) | |
private def oddsObs = | |
for { | |
chunk <- repo.query(chunkSize) | |
evens <- Observable from Future { | |
Remote.even(chunk) | |
} | |
} yield chunk -- evens // odd calculation emulates actual problem calc structure | |
} | |
object AsyncDisjointedChunkMultiprocessing extends App { | |
def apply(repo: Repository) = new AsyncDisjointedChunkMultiprocessing(repo) | |
println("computing...") | |
val t0 = System.currentTimeMillis() | |
println("output: " + apply(Repository()).process) | |
println("in " + (System.currentTimeMillis() - t0) + " ms") | |
} | |
class Enriched(val i: Int) { | |
override def toString() = s"enriched $i" | |
override def equals(o: Any) = o match { | |
case that: Enriched => i == that.i | |
case _ => false | |
} | |
override def hashCode = i | |
} | |
import scala.language.postfixOps | |
object Enriched { | |
def apply(i: Int) = new Enriched(i) | |
def enrich(poorObs: Observable[Set[Int]]): Observable[Future[Set[Enriched]]] = | |
poorObs.foldLeft((Set.empty[Int], Set.empty[Future[Set[Enriched]]])) { | |
case ((oddsBuffer, enrichedFutures), oddSet) => | |
val overChunk = oddSet.size + oddsBuffer.size - chunkSize | |
if (overChunk >= 0) oddSet splitAt overChunk match { | |
case (chunkReach, newBuffer) => | |
val poors = oddsBuffer ++ chunkReach | |
(newBuffer, enrichedFutures :+ poors) | |
} | |
else (oddsBuffer ++ oddSet, enrichedFutures) | |
} map { | |
case (leftOverOddsBuffer, penultimateEnrichedFuts) => | |
(penultimateEnrichedFuts :+ leftOverOddsBuffer) flat | |
} | |
private def enrich(poors: Set[Int]) = { | |
checkSizeLimit(poors); | |
Thread.sleep(1000) | |
poors map { Enriched(_) } | |
} | |
private implicit class EnrichedFutureSetOps(enrichedFutSets: Set[Future[Set[Enriched]]]) { | |
def :+(poors: Set[Int]): Set[Future[Set[Enriched]]] = | |
if (poors.isEmpty) enrichedFutSets | |
else enrichedFutSets ++ Set(Future(enrich(poors))) | |
def flat: Future[Set[Enriched]] = | |
Future.sequence(enrichedFutSets) map { | |
_.flatten | |
} | |
} | |
} | |
class Repository { | |
def query(fetchSize: Int = 10): Observable[Set[Int]] = | |
Observable[Set[Int]] { subscriber => | |
def nextFetchSignal(start: Int, end: Int) = | |
// not optimized for unsubscription as they're not expected | |
// but included for completeness | |
for (i <- start to end by fetchSize) { | |
if (!subscriber.isUnsubscribed) { | |
subscriber.onNext(DataSource.fetch(i)) | |
} | |
} | |
Future { | |
nextFetchSignal(1, 21) | |
nextFetchSignal(18, 31) | |
if (!subscriber.isUnsubscribed) { | |
subscriber.onCompleted() | |
} | |
} | |
} | |
} | |
object Repository { | |
def apply() = new Repository | |
} | |
object DataSource { | |
def fetch(begin: Int, fetchSize: Int = 10): Set[Int] = { | |
val end = begin + fetchSize | |
Thread.sleep(200) | |
(for { | |
i <- begin until end | |
} yield i).toSet | |
} | |
} | |
package object Helpers { | |
val chunkSize = 10 | |
def checkSizeLimit(set: Set[Int]): Unit = | |
if (set.size > chunkSize) throw new IllegalArgumentException(s"$chunkSize-element limit violated: ${set.size}") | |
} | |
// unmodifiable | |
object Remote { | |
// models licensed | |
def even = { xs: Set[Int] => | |
checkSizeLimit(xs) | |
Thread.sleep(5000) | |
xs filter { _ % 2 == 0 } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment