Skip to content

Instantly share code, notes, and snippets.

@bpk-t
Last active September 15, 2015 13:43
Show Gist options
  • Save bpk-t/da58b68883fd1702f270 to your computer and use it in GitHub Desktop.
Save bpk-t/da58b68883fd1702f270 to your computer and use it in GitHub Desktop.
import java.util.concurrent._
/**
* Created by BPK_t on 2015/08/18.
*/
object Main {
def funcA : Int = {
Thread.sleep(1000)
10
}
def funcB : Int = {
Thread.sleep(1000)
20
}
def main(args: Array[String]) {
val a = Par.lazyUnit(funcA)
val b = Par.lazyUnit(funcB)
val c = Par.map2(a,b)((aa,bb) => aa + bb)
val exec = Executors.newCachedThreadPool()
val start = System.currentTimeMillis()
println(c(exec).get(2000, TimeUnit.MILLISECONDS))
println("elap=" + (System.currentTimeMillis() - start))
exec.shutdown()
}
//7.2
type Par[A] = ExecutorService => Future[A]
object Par {
//評価されていないAを受け取り、それを別のスレッドで評価するための計算を返す
//def unit[A](a :=> A) : Par[A]
//並列計算から結果の値を取り出す
//runに取って代わった
//def get[A](a : Par[A]) : A
def lazyUnit[A](a : => A) : Par[A] = fork(unit(a))
def unit[A](a:A):Par[A] = (es:ExecutorService) => UnitFuture(a)
//7.4
def asyncF[A,B](f: A => B) : A => Par[B] =
a => lazyUnit(f(a))
//7.5
def sequence[A](ps : List[Par[A]]) : Par[List[A]] =
ps.foldRight(lazyUnit(List.empty[A]))
{ case (x,a) => map2(x,a)(_ :: _) }
def parMap[A,B](ps : List[A])(f : A => B) : Par[List[B]] = fork {
val fbs : List[Par[B]] = ps.map(asyncF(f))
sequence(fbs)
}
//7.6
def parFilter[A](as : List[A])(f: A => Boolean) : Par[List[A]] =
map(parMap(as)(x => (x, f(x))))(x => x.filter(_._2).map(y => y._1))
private case class UnitFuture[A](get:A) extends Future[A] {
def isDone = true
def get(timeout:Long,units:TimeUnit) = get
def isCancelled = false
def cancel(evenIfRunning:Boolean):Boolean = false
}
//7.3
private case class Map2Future[A,B,C](a : Future[A], b : Future[B], f : (A,B) => C) extends Future[C]{
def isDone = a.isDone && b.isDone
def get() : C = f(a.get, b.get)
def get(timeout : Long, units : TimeUnit) : C = {
val start = System.currentTimeMillis()
val aRet = a.get(timeout, units)
val bRet = b.get(TimeUnit.MILLISECONDS.convert(timeout, units) - (System.currentTimeMillis() - start), TimeUnit.MILLISECONDS)
f(aRet, bRet)
}
def isCancelled = a.isCancelled || b.isCancelled
def cancel(evenIfRunning:Boolean) : Boolean = a.cancel(evenIfRunning) && b.cancel(evenIfRunning)
}
//7.1
//2つの並列計算の結果を2項関数で結合する
def map2[A,B,C](a : Par[A], b : Par[B])(f : (A, B) => C) : Par[C] =
(es : ExecutorService) => {
val af = a(es)
val bf = b(es)
Map2Future(af, bf, f)
}
def map[A,B](pa : Par[A])(f : A => B) : Par[B] =
map2(pa, unit(()))((a, _) => f(a))
def fork[A](a: => Par[A]) : Par[A] =
(es : ExecutorService) => es.submit(new Callable[A] {
def call = a(es).get
})
//7.7
//map(y)(id) == y
//map(map(y)(g))(f) == map(y)(f compose g)
//直ちにa値が得られる計算を作成
//def fork[A](a :=> Par[A]) : Par[A]
//runによる並列評価の対象としてマーク
//def lazyUnit[A](a:=>A):Par[A] = fork(unit(a))
//与えられたParを完全に評価し、forkによって要求される並列計算を生成し、結果の値を取得する
//def run[A](a:Par[A]):A
def equal[A](e : ExecutorService)(p : Par[A], p2 : Par[A]) : Boolean =
p(e).get == p2(e).get
//7.9
def ex7_9() = {
val a = lazyUnit(42 + 1)
val S = Executors.newFixedThreadPool(1)
println(Par.equal(S)(a, fork(a)))
//スレッドの数がforkの数未満の場合、デッドロックになる
val S2 = Executors.newFixedThreadPool(2)
println(Par.equal(S)(a, fork(fork(a))))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment