published | tags | ||||
---|---|---|---|---|---|
true |
|
一提到Stream(串流),第一個想到的就是影音的類型,這種應用面是非常的普遍也很容易理解。
而我第一次在project中想嘗試使用Streaming的時候,一直有個困擾,我的protocol是HTTP+JSON,我就是要等到整個payload都收下來了,才能Deserialize成Json object啊,這樣stream對我的好處是什麼?
前陣子有同事提到一個例子,說Java8的Performance比Scala好很多! 仔細發現,其實他在測試時用到的是Java8的Stream,但是卻拿Scala一般的Collection來比,就好比下面這樣的結果。(最後當然是幫Scala版本加個toStream兩個的差距就差不多了。)
def measure(f:()=>Any)={
val s=System.currentTimeMillis
f()
println(s"elapsed: ${System.currentTimeMillis-s}ms")
}
scala> measure{ () => (1 to 20000000).toStream.filter(_%2==0).filter(_>200).head }
elapsed: 1ms
scala> measure{ () => (1 to 20000000).filter(_%2==0).filter(_>200).head }
elapsed: 983ms
一看就知道這是一個不公平的結果!!! 一個是乖乖把所有的結果整理完,然後往後傳,通通filter過之後,只取第一個結果,另一個是streaming(串流)的方式,把資料用"流"的方式一步步的流過filter與head,當第一個match的結果出現之後就結束了。
但這個例子讓我們知道,原來stream除了從合適的protocol處理就能有幫助之外,其實就算在系統內部的資料傳遞與處理的過程中,在適當的使用下,一樣是能提升效率的。
最近Streaming的處理方式愈來愈紅,Akka在上個月(2015.07)也正式Release Akka Streams & Http Experimental 1.0,而Twitter Util也有提供AsyncStream,結合在Finatra上使用,看起來也不錯!
不過今天主角是Scalaz Stream,主要參考這篇Scalaz Stream - a Functional Reactive Programming Tutorial。
以下來簡單的介紹一下Scalaz Stream的用法
Scalaz Stream中,最重要的核心物件就是Process
,這個Process[F[_], X]
主要是定義了串流中的物件為X
型態,並且透過F[_]
這個Monad來包裝他,而在實際的使用上,通常會使用Scalaz的Task (請參考我上一篇寫的[ScaVa->Scala] Scalaz Task 取代Scala Future來進行非同步處理的另一個選擇)。
Process
提供了一些run
的方式,並且透過F[_]
這個effect system來包裝他,可以想像F[_]
是要處理這些X
型態值的driver。
trait Process[F[_], X] {
...
def run(implicit m: Monad[F]): F[Unit]
def runLast(implicit m: Monad[F]): F[X]
def runLog(implicit m: Monad[F]): F[IndexedSeq[X]]
}
在使用上的觀念要知道Process
是個Stream,所以他的值會是一直"流"進來的,當使用run
的時候,他回傳的是F[Unit]
,所以他是忽略你的回傳值,而主要的使用是用F[_]
的特性來處理這些值,例如使用Task
時,就可以使用他的timed
, handleWith
, onFinished
之類的function。
而使用runLast
的時候,會執行整個串流,一直到最後一個值並只回傳最後一個值。
使用runLog
會將整所有的值都包在IndexedSeq[X]
裡面,讓你可以一口氣取得所有串流中的值,這也是最耗記憶體的,就是把所有的結果都收集起來。
若以List
monad來當作F[_]
,就會像以下這樣的例子:
scala> val p: Process[List,Int] = Process.range(0,10)
scala> p.run
res5: List[Unit] = List(())
scala> p.runLast
res10: List[Option[Int]] = List(Some(9))
scala> p.runLog
res11: List[IndexedSeq[Int]] = List(Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
這裡舉幾種產生Process方式的例子:
val p = Process(1,2,3,4,5)
//p: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3, 4, 5))
如果你直接嘗試去run
這個p
,會得到以下的錯誤:
scala> p.run
<console>:16: error: could not find implicit value for parameter C: scalaz.Catchable[F2]
p.run
^
因為他缺少了driver,也就是F[_]
,所以他不知道該怎麼去處理他,這時候他的型態是Process0
,只是個wrapper,所以你可以直接toList
取得你要的值,或是告訴他你的driver是什麼。
val p = Process(1,2,3,4,5):Process[Task,Int]
//p: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(WrappedArray(1, 2, 3, 4, 5))
p.runLog.run
//IndexedSeq[Int] = Vector(1, 2, 3, 4, 5)
scalaz.stream.io
有將一些io的操作包裝起來,讓你方便直接使用,例如讀檔案:
val lns: Process[Task,String] = io.linesR("inputfile.txt")
這種用法,很適合將你本來要進行的處理(一些java/scala library的存取、或是對server的request)給包成Task
,再透過Process來將這裡面的結果包成Stream。
val f: Task[A] = Task.async { ... } //Do something to create an object
val evalF: Process[Task,A] = Process.eval(f)
這類型的有eval
,repeat
,repeatEval
,suspend
等…
scalaz.stream
實作了Queue
來讓你從一邊把串流的值給不斷的塞進去,而另一邊可以直接以Process
來接收這些串流的值。
import scalaz.stream._
val q = async.unboundedQueue[String]
while (true) {
val inputString = readFromTheNetwork()
q.enqueueOne( inputString ).run
}
//...elsewhere...
val stringsFromTheNetwork: Process[Task,String] = q.dequeue
這裡的signal
跟queue
不同的地方是,這個的implementation是針對signal,所以他保證你能得到最後的值,但中間過程中若有許多次的變動,不一定每次都會拿得到,若你是希望每一個過程都要拿得到的話,要使用queue
。
val signal = async.signal[Boolean]
val signalChanges: Process[Task,Boolean] = signal.discrete
//Thread 1
signal.set(true).run // Time = 1
signal.set(true).run // Time = 2
signal.set(false).run //Time = 3
...
//Thread 2
signalChanges.map(x => {
println("" + x + " -> " + System.currentTimeMillis)
}).run.run
// Will print:
// true -> 1
// false -> 3
import scala.concurrent.duration._
val clock = Process.awakeEvery(1 seconds)