Last active
September 15, 2015 13:43
-
-
Save bpk-t/da58b68883fd1702f270 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent._ | |
/** | |
* Created by BPK_t on 2015/08/18. | |
*/ | |
object Main { | |
def funcA : Int = { | |
Thread.sleep(1000) | |
10 | |
} | |
def funcB : Int = { | |
Thread.sleep(1000) | |
20 | |
} | |
def main(args: Array[String]) { | |
val a = Par.lazyUnit(funcA) | |
val b = Par.lazyUnit(funcB) | |
val c = Par.map2(a,b)((aa,bb) => aa + bb) | |
val exec = Executors.newCachedThreadPool() | |
val start = System.currentTimeMillis() | |
println(c(exec).get(2000, TimeUnit.MILLISECONDS)) | |
println("elap=" + (System.currentTimeMillis() - start)) | |
exec.shutdown() | |
} | |
//7.2 | |
type Par[A] = ExecutorService => Future[A] | |
object Par { | |
//評価されていないAを受け取り、それを別のスレッドで評価するための計算を返す | |
//def unit[A](a :=> A) : Par[A] | |
//並列計算から結果の値を取り出す | |
//runに取って代わった | |
//def get[A](a : Par[A]) : A | |
def lazyUnit[A](a : => A) : Par[A] = fork(unit(a)) | |
def unit[A](a:A):Par[A] = (es:ExecutorService) => UnitFuture(a) | |
//7.4 | |
def asyncF[A,B](f: A => B) : A => Par[B] = | |
a => lazyUnit(f(a)) | |
//7.5 | |
def sequence[A](ps : List[Par[A]]) : Par[List[A]] = | |
ps.foldRight(lazyUnit(List.empty[A])) | |
{ case (x,a) => map2(x,a)(_ :: _) } | |
def parMap[A,B](ps : List[A])(f : A => B) : Par[List[B]] = fork { | |
val fbs : List[Par[B]] = ps.map(asyncF(f)) | |
sequence(fbs) | |
} | |
//7.6 | |
def parFilter[A](as : List[A])(f: A => Boolean) : Par[List[A]] = | |
map(parMap(as)(x => (x, f(x))))(x => x.filter(_._2).map(y => y._1)) | |
private case class UnitFuture[A](get:A) extends Future[A] { | |
def isDone = true | |
def get(timeout:Long,units:TimeUnit) = get | |
def isCancelled = false | |
def cancel(evenIfRunning:Boolean):Boolean = false | |
} | |
//7.3 | |
private case class Map2Future[A,B,C](a : Future[A], b : Future[B], f : (A,B) => C) extends Future[C]{ | |
def isDone = a.isDone && b.isDone | |
def get() : C = f(a.get, b.get) | |
def get(timeout : Long, units : TimeUnit) : C = { | |
val start = System.currentTimeMillis() | |
val aRet = a.get(timeout, units) | |
val bRet = b.get(TimeUnit.MILLISECONDS.convert(timeout, units) - (System.currentTimeMillis() - start), TimeUnit.MILLISECONDS) | |
f(aRet, bRet) | |
} | |
def isCancelled = a.isCancelled || b.isCancelled | |
def cancel(evenIfRunning:Boolean) : Boolean = a.cancel(evenIfRunning) && b.cancel(evenIfRunning) | |
} | |
//7.1 | |
//2つの並列計算の結果を2項関数で結合する | |
def map2[A,B,C](a : Par[A], b : Par[B])(f : (A, B) => C) : Par[C] = | |
(es : ExecutorService) => { | |
val af = a(es) | |
val bf = b(es) | |
Map2Future(af, bf, f) | |
} | |
def map[A,B](pa : Par[A])(f : A => B) : Par[B] = | |
map2(pa, unit(()))((a, _) => f(a)) | |
def fork[A](a: => Par[A]) : Par[A] = | |
(es : ExecutorService) => es.submit(new Callable[A] { | |
def call = a(es).get | |
}) | |
//7.7 | |
//map(y)(id) == y | |
//map(map(y)(g))(f) == map(y)(f compose g) | |
//直ちにa値が得られる計算を作成 | |
//def fork[A](a :=> Par[A]) : Par[A] | |
//runによる並列評価の対象としてマーク | |
//def lazyUnit[A](a:=>A):Par[A] = fork(unit(a)) | |
//与えられたParを完全に評価し、forkによって要求される並列計算を生成し、結果の値を取得する | |
//def run[A](a:Par[A]):A | |
def equal[A](e : ExecutorService)(p : Par[A], p2 : Par[A]) : Boolean = | |
p(e).get == p2(e).get | |
//7.9 | |
def ex7_9() = { | |
val a = lazyUnit(42 + 1) | |
val S = Executors.newFixedThreadPool(1) | |
println(Par.equal(S)(a, fork(a))) | |
//スレッドの数がforkの数未満の場合、デッドロックになる | |
val S2 = Executors.newFixedThreadPool(2) | |
println(Par.equal(S)(a, fork(fork(a)))) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment