Created
November 24, 2011 09:54
-
-
Save cooldaemon/1391001 to your computer and use it in GitHub Desktop.
Tried scalaz.concurrent.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.cooldaemon.scalaz_test | |
import scalaz._ | |
import scalaz.concurrent._ | |
object TryConcurrent { | |
import Scalaz._ | |
import java.lang.Thread.currentThread | |
def run { | |
runStrategy | |
runEffect | |
runActor | |
runPromise | |
} | |
def runStrategy { | |
val myStrategy = new Strategy { // 実は、これは Sequential として定義済み | |
def apply[A](runnable: => A) = { | |
val result = runnable // 渡された関数を現在の Thread 上で評価 | |
() => result // 結果を返す関数を返す | |
} | |
} | |
val myResult = myStrategy {currentThread.getName} | |
println("myResult = " + myResult()) | |
// Thread プール作成 | |
// DefaultStrategy の場合、Thread を生成する際にデーモン化する | |
import java.util.concurrent.{TimeUnit, Executors} | |
implicit val executorService = Executors.newFixedThreadPool(2) | |
val executorServiceStrategy = Strategy.Executor // Thread プールを使用する Strategy | |
val executorServiceResult = executorServiceStrategy {currentThread.getName} | |
println("executerServiceResult = " + executorServiceResult()) | |
executorService.shutdown | |
executorService.awaitTermination(60L, TimeUnit.SECONDS) | |
} | |
def runEffect { | |
import Strategy.Naive // 毎回新しい Thread を作成 | |
val myEffect1 = effect[String] { message => // implicit な Strategy が無ければ DefaultStrategy を使用 | |
println(currentThread.getName + " : " + message) | |
} | |
(1 to 3) foreach {n => myEffect1 ! "run my effect"} | |
// Effect[String] と Int => String を組み合わせて Effect[Int] を作成 | |
val myEffect2 = myEffect1 contramap { (n: Int) => | |
println(currentThread.getName + " : " + n.toString) | |
"run my effect " + n.toString | |
} | |
// myEffect2 にメッセージを送ると、myEffect2 の中で myEffect1 へメッセージを送る | |
(1 to 3) foreach {n => myEffect2 ! n} | |
} | |
def runActor { | |
import Strategy.Naive // 毎回新しい Thread を作成する戦略だが | |
val myActor1 = actor { (message: String) => // Actor ならメッセージキューの処理単位で Thread を作成する | |
println(currentThread.getName + " : " + message) | |
} | |
(1 to 3) foreach {n => myActor1 ! "run my actor"} | |
// contramap も使える | |
val myActor2 = myActor1 contramap { (n: Int) => | |
println(currentThread.getName + " : " + n.toString) | |
"run my actor " + n.toString | |
} | |
(1 to 3) foreach {n => myActor2 ! n} | |
} | |
def runPromise { | |
import Strategy.Naive | |
def m1 = {currentThread.getName + " : 1"} | |
val myPromise1 = new Promise[String]() //emptyPromise[String] | |
myPromise1.fulfill(m1) // シグナル Done が Promise が持つ Actor へ送られ | |
// Actor 上で Done の eval が評価される。 | |
// Promise の初期ステータスは Unfulfilled だが、ここで | |
// Unfulfilled の fulfill が評価され、ステータスが Fulfilled に変わる | |
println("myPromise1 = " + myPromise1.get) // Fulfilled の get が評価されている | |
def m2 = {currentThread.getName + " : 2"} | |
val myPromise2 = promise(m2) // 上記を省略して、この様に記述しても良い | |
// object Promise.apply で Done を送ってる | |
// promise(m2) = m2.pure[Promise] = a.η[Promise] | |
println("myPromise2 = " + myPromise2.get) | |
// Actor or Effect に結果を投げる | |
def m3 = {currentThread.getName + " : 3"} | |
val myActor = actor[String] { message => | |
println("myPromise3 = " + message + " : " + currentThread.getName) | |
} | |
promise(m3) to myActor // Thread 番号が一つ抜けるのは、多分、Unfulfilled の状態で to が呼ばれるので | |
// 一旦、Actor がキューイングされるため | |
// 後述する map/flatMap/filter は、内部で to を使用し | |
// Promise コンテナの中を Effect に渡している | |
// Promise の合成(ここから本題) | |
// map | |
def m4 = {currentThread.getName + " : 4"} | |
val myPromise4 = promise(m4) map {message => message + " : " + currentThread.getName} | |
println("myPromise4 = " + myPromise4.get) | |
// flatMap | |
def m5 = {currentThread.getName + " : 5"} | |
val myPromise5 = promise(m5) flatMap {message => promise(message + " : " + currentThread.getName)} | |
println("myPromise5 = " + myPromise5.get) | |
def m6(message: String) = {message + " : " + currentThread.getName} | |
val f = {m6(_)}.promise // 結果を Promise コンテナに包んで返す関数にする | |
println("result = " + f("foo").get) // 当然、別 Thread で評価される | |
val myPromise6 = f("foo") flatMap f | |
println("myPromise6 = " + myPromise6.get) | |
val joinPromise = promise(f("foo")).join | |
println("joinPromise = " + joinPromise.get); // Promise を重ねても join は一瞬で終わる | |
// filter | |
val filterFunction = {(message: String) => | |
println(message + " : " + currentThread.getName) | |
message match { | |
case "bar" => true | |
case _ => false | |
} | |
} | |
val myPromise7 = promise("bar") filter filterFunction | |
println("myPromise7 = " + myPromise7.get) | |
val myPromise8 = promise("baz") filter filterFunction // myPromise8 が Fulfilled になる事はない | |
myPromise8.break | |
try {myPromise8.get} catch {case e => println("myPromise8 = " + e.toString)} | |
// Promise は === や /== で比較する事もできる。ただし、get が評価されるので、=== は main 以外の Thread 推奨 | |
// for | |
val myPromise9 = for ( | |
m1 <- f("foo"); | |
m2 <- f(m1); | |
m3 <- f(m2) | |
) yield m3 | |
println("myPromise9 = " + myPromise9.get) | |
// Kleisli | |
val g = f >=> f >=> f | |
println("Kleisli = " + g("foo").get) | |
// Applicative | |
val myPromise10 = (f("foo") |@| f("bar") |@| f("baz")) {_ + " | " + _ + " | " + _} | |
println("myPromise10 = " + myPromise10.get) | |
val h = {(s: String) => Thread.sleep(1000); println("in h: " + s); m6(s)}.promise | |
val startTime = System.currentTimeMillis | |
val myPromise16 = (h("foo") |@| h("bar") |@| h("baz")) {_ + " | " + _ + " | " + _} | |
println("myPromise16 = " + myPromise16.get) | |
val elapseTime = System.currentTimeMillis - startTime; | |
println(elapseTime) | |
// traverse | |
val mySeq: Seq[Int] = (1 to 3).toSeq | |
val myPromise11 = mySeq traverse { n => | |
def m11 = {currentThread.getName + " : " + n.toString} | |
promise(m11) | |
} | |
println("myPromise11 = " + myPromise11.get) | |
val promises: Seq[Promise[String]] = (1 to 3) map {n => | |
def m12 = {currentThread.getName + " : " + n.toString} | |
promise(m12) | |
} | |
val myPromise12 = promises.sequence | |
println("myPromise12 = " + myPromise12.get) | |
// parMap (実は、map(f.promise).sequence してるだけ) | |
val myPromise13 = mySeq parMap {n => | |
currentThread.getName + " : " + n.toString | |
} | |
println("myPromise13 = " + myPromise13.get) | |
// parBind | |
val myPromise14 = mySeq parBind {n => | |
Seq(currentThread.getName + " : " + n.toString) | |
} | |
println("myPromise14 = " + myPromise14.get) | |
// parZipWith | |
val myPromise15 = mySeq.parZipWith(mySeq) {(x, y) => | |
currentThread.getName + " : " + x.toString + " - " + y.toString | |
} | |
println("myPromise15 = " + myPromise15.get) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment