Skip to content

Instantly share code, notes, and snippets.

@cooldaemon
Created November 24, 2011 09:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cooldaemon/1391001 to your computer and use it in GitHub Desktop.
Save cooldaemon/1391001 to your computer and use it in GitHub Desktop.
Tried scalaz.concurrent.
package com.github.cooldaemon.scalaz_test
import scalaz._
import scalaz.concurrent._
object TryConcurrent {
import Scalaz._
import java.lang.Thread.currentThread
def run {
runStrategy
runEffect
runActor
runPromise
}
def runStrategy {
val myStrategy = new Strategy { // 実は、これは Sequential として定義済み
def apply[A](runnable: => A) = {
val result = runnable // 渡された関数を現在の Thread 上で評価
() => result // 結果を返す関数を返す
}
}
val myResult = myStrategy {currentThread.getName}
println("myResult = " + myResult())
// Thread プール作成
// DefaultStrategy の場合、Thread を生成する際にデーモン化する
import java.util.concurrent.{TimeUnit, Executors}
implicit val executorService = Executors.newFixedThreadPool(2)
val executorServiceStrategy = Strategy.Executor // Thread プールを使用する Strategy
val executorServiceResult = executorServiceStrategy {currentThread.getName}
println("executerServiceResult = " + executorServiceResult())
executorService.shutdown
executorService.awaitTermination(60L, TimeUnit.SECONDS)
}
def runEffect {
import Strategy.Naive // 毎回新しい Thread を作成
val myEffect1 = effect[String] { message => // implicit な Strategy が無ければ DefaultStrategy を使用
println(currentThread.getName + " : " + message)
}
(1 to 3) foreach {n => myEffect1 ! "run my effect"}
// Effect[String] と Int => String を組み合わせて Effect[Int] を作成
val myEffect2 = myEffect1 contramap { (n: Int) =>
println(currentThread.getName + " : " + n.toString)
"run my effect " + n.toString
}
// myEffect2 にメッセージを送ると、myEffect2 の中で myEffect1 へメッセージを送る
(1 to 3) foreach {n => myEffect2 ! n}
}
def runActor {
import Strategy.Naive // 毎回新しい Thread を作成する戦略だが
val myActor1 = actor { (message: String) => // Actor ならメッセージキューの処理単位で Thread を作成する
println(currentThread.getName + " : " + message)
}
(1 to 3) foreach {n => myActor1 ! "run my actor"}
// contramap も使える
val myActor2 = myActor1 contramap { (n: Int) =>
println(currentThread.getName + " : " + n.toString)
"run my actor " + n.toString
}
(1 to 3) foreach {n => myActor2 ! n}
}
def runPromise {
import Strategy.Naive
def m1 = {currentThread.getName + " : 1"}
val myPromise1 = new Promise[String]() //emptyPromise[String]
myPromise1.fulfill(m1) // シグナル Done が Promise が持つ Actor へ送られ
// Actor 上で Done の eval が評価される。
// Promise の初期ステータスは Unfulfilled だが、ここで
// Unfulfilled の fulfill が評価され、ステータスが Fulfilled に変わる
println("myPromise1 = " + myPromise1.get) // Fulfilled の get が評価されている
def m2 = {currentThread.getName + " : 2"}
val myPromise2 = promise(m2) // 上記を省略して、この様に記述しても良い
// object Promise.apply で Done を送ってる
// promise(m2) = m2.pure[Promise] = a.η[Promise]
println("myPromise2 = " + myPromise2.get)
// Actor or Effect に結果を投げる
def m3 = {currentThread.getName + " : 3"}
val myActor = actor[String] { message =>
println("myPromise3 = " + message + " : " + currentThread.getName)
}
promise(m3) to myActor // Thread 番号が一つ抜けるのは、多分、Unfulfilled の状態で to が呼ばれるので
// 一旦、Actor がキューイングされるため
// 後述する map/flatMap/filter は、内部で to を使用し
// Promise コンテナの中を Effect に渡している
// Promise の合成(ここから本題)
// map
def m4 = {currentThread.getName + " : 4"}
val myPromise4 = promise(m4) map {message => message + " : " + currentThread.getName}
println("myPromise4 = " + myPromise4.get)
// flatMap
def m5 = {currentThread.getName + " : 5"}
val myPromise5 = promise(m5) flatMap {message => promise(message + " : " + currentThread.getName)}
println("myPromise5 = " + myPromise5.get)
def m6(message: String) = {message + " : " + currentThread.getName}
val f = {m6(_)}.promise // 結果を Promise コンテナに包んで返す関数にする
println("result = " + f("foo").get) // 当然、別 Thread で評価される
val myPromise6 = f("foo") flatMap f
println("myPromise6 = " + myPromise6.get)
val joinPromise = promise(f("foo")).join
println("joinPromise = " + joinPromise.get); // Promise を重ねても join は一瞬で終わる
// filter
val filterFunction = {(message: String) =>
println(message + " : " + currentThread.getName)
message match {
case "bar" => true
case _ => false
}
}
val myPromise7 = promise("bar") filter filterFunction
println("myPromise7 = " + myPromise7.get)
val myPromise8 = promise("baz") filter filterFunction // myPromise8 が Fulfilled になる事はない
myPromise8.break
try {myPromise8.get} catch {case e => println("myPromise8 = " + e.toString)}
// Promise は === や /== で比較する事もできる。ただし、get が評価されるので、=== は main 以外の Thread 推奨
// for
val myPromise9 = for (
m1 <- f("foo");
m2 <- f(m1);
m3 <- f(m2)
) yield m3
println("myPromise9 = " + myPromise9.get)
// Kleisli
val g = f >=> f >=> f
println("Kleisli = " + g("foo").get)
// Applicative
val myPromise10 = (f("foo") |@| f("bar") |@| f("baz")) {_ + " | " + _ + " | " + _}
println("myPromise10 = " + myPromise10.get)
val h = {(s: String) => Thread.sleep(1000); println("in h: " + s); m6(s)}.promise
val startTime = System.currentTimeMillis
val myPromise16 = (h("foo") |@| h("bar") |@| h("baz")) {_ + " | " + _ + " | " + _}
println("myPromise16 = " + myPromise16.get)
val elapseTime = System.currentTimeMillis - startTime;
println(elapseTime)
// traverse
val mySeq: Seq[Int] = (1 to 3).toSeq
val myPromise11 = mySeq traverse { n =>
def m11 = {currentThread.getName + " : " + n.toString}
promise(m11)
}
println("myPromise11 = " + myPromise11.get)
val promises: Seq[Promise[String]] = (1 to 3) map {n =>
def m12 = {currentThread.getName + " : " + n.toString}
promise(m12)
}
val myPromise12 = promises.sequence
println("myPromise12 = " + myPromise12.get)
// parMap (実は、map(f.promise).sequence してるだけ)
val myPromise13 = mySeq parMap {n =>
currentThread.getName + " : " + n.toString
}
println("myPromise13 = " + myPromise13.get)
// parBind
val myPromise14 = mySeq parBind {n =>
Seq(currentThread.getName + " : " + n.toString)
}
println("myPromise14 = " + myPromise14.get)
// parZipWith
val myPromise15 = mySeq.parZipWith(mySeq) {(x, y) =>
currentThread.getName + " : " + x.toString + " - " + y.toString
}
println("myPromise15 = " + myPromise15.get)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment