Skip to content

Instantly share code, notes, and snippets.

@mayonesa
Last active March 28, 2016 16:26
Show Gist options
  • Save mayonesa/69153463fae6df1c6c58 to your computer and use it in GitHub Desktop.
Save mayonesa/69153463fae6df1c6c58 to your computer and use it in GitHub Desktop.
not unnecessary blocking (fork/join -> new cached-thread pool)
package jj.async
import scala.concurrent.{ ExecutionContext, Future, Await }
import java.util.concurrent.Executors.newCachedThreadPool
import rx.lang.scala.Observable
import jj.async.Helpers._
import scala.concurrent.duration._
import jj.async.Enriched._
/* Problem: How to optimize multi-process records asynchronously in chunks.
Processing steps:
- fetch finite # of records from a repository (10 at-a-time (<= 10 for last batch) because of downstream limitations)
- process ea. chunk through a filter asynchronously (has 10-record input limit)
- compute the reverse of the filtered result
- enrich (also has 10-record input limit (max it out for optimization)) filtered results asynchronously
- return enriched filtered results once all records are processed
*/
class AsyncDisjointedChunkMultiprocessing(repo: Repository) {
implicit val ec = ExecutionContext fromExecutor newCachedThreadPool
def process(): Set[Enriched] =
Await.result(enrich(oddsObs).toBlocking single, 7 seconds)
private def oddsObs =
for {
chunk <- repo.query(chunkSize)
evens <- Observable from Future {
Remote.even(chunk)
}
} yield chunk -- evens // odd calculation emulates actual problem calc structure
}
object AsyncDisjointedChunkMultiprocessing extends App {
def apply(repo: Repository) = new AsyncDisjointedChunkMultiprocessing(repo)
println("computing...")
val t0 = System.currentTimeMillis()
println("output: " + apply(Repository()).process)
println("in " + (System.currentTimeMillis() - t0) + " ms")
}
class Enriched(val i: Int) {
override def toString() = s"enriched $i"
override def equals(o: Any) = o match {
case that: Enriched => i == that.i
case _ => false
}
override def hashCode = i
}
import scala.language.postfixOps
object Enriched {
def apply(i: Int) = new Enriched(i)
def enrich(poorObs: Observable[Set[Int]]): Observable[Future[Set[Enriched]]] =
poorObs.foldLeft((Set.empty[Int], Set.empty[Future[Set[Enriched]]])) {
case ((oddsBuffer, enrichedFutures), oddSet) =>
val overChunk = oddSet.size + oddsBuffer.size - chunkSize
if (overChunk >= 0) oddSet splitAt overChunk match {
case (chunkReach, newBuffer) =>
val poors = oddsBuffer ++ chunkReach
(newBuffer, enrichedFutures :+ poors)
}
else (oddsBuffer ++ oddSet, enrichedFutures)
} map {
case (leftOverOddsBuffer, penultimateEnrichedFuts) =>
(penultimateEnrichedFuts :+ leftOverOddsBuffer) flat
}
private def enrich(poors: Set[Int]) = {
checkSizeLimit(poors);
Thread.sleep(1000)
poors map { Enriched(_) }
}
private implicit class EnrichedFutureSetOps(enrichedFutSets: Set[Future[Set[Enriched]]]) {
def :+(poors: Set[Int]): Set[Future[Set[Enriched]]] =
if (poors.isEmpty) enrichedFutSets
else enrichedFutSets ++ Set(Future(enrich(poors)))
def flat: Future[Set[Enriched]] =
Future.sequence(enrichedFutSets) map {
_.flatten
}
}
}
class Repository {
def query(fetchSize: Int = 10): Observable[Set[Int]] =
Observable[Set[Int]] { subscriber =>
def nextFetchSignal(start: Int, end: Int) =
// not optimized for unsubscription as they're not expected
// but included for completeness
for (i <- start to end by fetchSize) {
if (!subscriber.isUnsubscribed) {
subscriber.onNext(DataSource.fetch(i))
}
}
Future {
nextFetchSignal(1, 21)
nextFetchSignal(18, 31)
if (!subscriber.isUnsubscribed) {
subscriber.onCompleted()
}
}
}
}
object Repository {
def apply() = new Repository
}
object DataSource {
def fetch(begin: Int, fetchSize: Int = 10): Set[Int] = {
val end = begin + fetchSize
Thread.sleep(200)
(for {
i <- begin until end
} yield i).toSet
}
}
package object Helpers {
val chunkSize = 10
def checkSizeLimit(set: Set[Int]): Unit =
if (set.size > chunkSize) throw new IllegalArgumentException(s"$chunkSize-element limit violated: ${set.size}")
}
// unmodifiable
object Remote {
// models licensed
def even = { xs: Set[Int] =>
checkSizeLimit(xs)
Thread.sleep(5000)
xs filter { _ % 2 == 0 }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment