Skip to content

Instantly share code, notes, and snippets.

@joecwu
Last active August 26, 2015 19:04
Show Gist options
  • Save joecwu/525aaf0b9b886eb68b74 to your computer and use it in GitHub Desktop.
Save joecwu/525aaf0b9b886eb68b74 to your computer and use it in GitHub Desktop.
[ScaVa->Scala] Scalaz Stream 串流好朋友 part 1
published tags
true
ScaVaToScala
Scala
Scalaz
Programming

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 1

這篇同步發佈在我的BlogGist

何時用到Stream(串流)?

一提到Stream(串流),第一個想到的就是影音的類型,這種應用面是非常的普遍也很容易理解。

而我第一次在project中想嘗試使用Streaming的時候,一直有個困擾,我的protocol是HTTP+JSON,我就是要等到整個payload都收下來了,才能Deserialize成Json object啊,這樣stream對我的好處是什麼?

前陣子有同事提到一個例子,說Java8的Performance比Scala好很多! 仔細發現,其實他在測試時用到的是Java8的Stream,但是卻拿Scala一般的Collection來比,就好比下面這樣的結果。(最後當然是幫Scala版本加個toStream兩個的差距就差不多了。)

def measure(f:()=>Any)={
  val s=System.currentTimeMillis
  f()
  println(s"elapsed: ${System.currentTimeMillis-s}ms")
}

scala> measure{ () => (1 to 20000000).toStream.filter(_%2==0).filter(_>200).head }
elapsed: 1ms

scala> measure{ () => (1 to 20000000).filter(_%2==0).filter(_>200).head }
elapsed: 983ms

一看就知道這是一個不公平的結果!!! 一個是乖乖把所有的結果整理完,然後往後傳,通通filter過之後,只取第一個結果,另一個是streaming(串流)的方式,把資料用"流"的方式一步步的流過filter與head,當第一個match的結果出現之後就結束了。

但這個例子讓我們知道,原來stream除了從合適的protocol處理就能有幫助之外,其實就算在系統內部的資料傳遞與處理的過程中,在適當的使用下,一樣是能提升效率的。

Streaming的選擇

最近Streaming的處理方式愈來愈紅,Akka在上個月(2015.07)也正式Release Akka Streams & Http Experimental 1.0,而Twitter Util也有提供AsyncStream,結合在Finatra上使用,看起來也不錯!

不過今天主角是Scalaz Stream,主要參考這篇Scalaz Stream - a Functional Reactive Programming Tutorial

以下來簡單的介紹一下Scalaz Stream的用法

Scalaz Stream

什麼是Process?

Scalaz Stream中,最重要的核心物件就是Process,這個Process[F[_], X]主要是定義了串流中的物件為X型態,並且透過F[_]這個Monad來包裝他,而在實際的使用上,通常會使用Scalaz的Task (請參考我上一篇寫的[ScaVa->Scala] Scalaz Task 取代Scala Future來進行非同步處理的另一個選擇)。

Process提供了一些run的方式,並且透過F[_]這個effect system來包裝他,可以想像F[_]是要處理這些X型態值的driver。

trait Process[F[_], X] {
  ...
  def run(implicit m: Monad[F]): F[Unit]
  def runLast(implicit m: Monad[F]): F[X]
  def runLog(implicit m: Monad[F]): F[IndexedSeq[X]]
}

在使用上的觀念要知道Process是個Stream,所以他的值會是一直"流"進來的,當使用run的時候,他回傳的是F[Unit],所以他是忽略你的回傳值,而主要的使用是用F[_]的特性來處理這些值,例如使用Task時,就可以使用他的timed, handleWith, onFinished之類的function。

而使用runLast的時候,會執行整個串流,一直到最後一個值並只回傳最後一個值。

使用runLog會將整所有的值都包在IndexedSeq[X]裡面,讓你可以一口氣取得所有串流中的值,這也是最耗記憶體的,就是把所有的結果都收集起來。

若以Listmonad來當作F[_],就會像以下這樣的例子:

scala> val p: Process[List,Int] = Process.range(0,10)

scala> p.run
res5: List[Unit] = List(())

scala> p.runLast
res10: List[Option[Int]] = List(Some(9))

scala> p.runLog
res11: List[IndexedSeq[Int]] = List(Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))

產生Process

這裡舉幾種產生Process方式的例子:

1. 要把現有的值包成Process

val p = Process(1,2,3,4,5)
//p: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3, 4, 5))

如果你直接嘗試去run這個p,會得到以下的錯誤:

scala> p.run
<console>:16: error: could not find implicit value for parameter C: scalaz.Catchable[F2]
       p.run
         ^

因為他缺少了driver,也就是F[_],所以他不知道該怎麼去處理他,這時候他的型態是Process0,只是個wrapper,所以你可以直接toList取得你要的值,或是告訴他你的driver是什麼。

val p = Process(1,2,3,4,5):Process[Task,Int]
//p: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(WrappedArray(1, 2, 3, 4, 5))
p.runLog.run
//IndexedSeq[Int] = Vector(1, 2, 3, 4, 5)

2. 使用scalaz.stream.io來產生

scalaz.stream.io有將一些io的操作包裝起來,讓你方便直接使用,例如讀檔案:

val lns: Process[Task,String] = io.linesR("inputfile.txt")

3. 使用Task.async來包裝你的執行動作,再透過Process.eval來產生

這種用法,很適合將你本來要進行的處理(一些java/scala library的存取、或是對server的request)給包成Task,再透過Process來將這裡面的結果包成Stream。

val f: Task[A] = Task.async { ... } //Do something to create an object
val evalF: Process[Task,A] = Process.eval(f)

這類型的有eval,repeat,repeatEval,suspend等…

4. 使用scalaz.stream.async.Queue

scalaz.stream實作了Queue來讓你從一邊把串流的值給不斷的塞進去,而另一邊可以直接以Process來接收這些串流的值。

import scalaz.stream._

val q = async.unboundedQueue[String]

while (true) {
  val inputString = readFromTheNetwork()
  q.enqueueOne( inputString ).run
}

//...elsewhere...

val stringsFromTheNetwork: Process[Task,String] = q.dequeue

5. 另外一種方式 - signal

這裡的signalqueue不同的地方是,這個的implementation是針對signal,所以他保證你能得到最後的值,但中間過程中若有許多次的變動,不一定每次都會拿得到,若你是希望每一個過程都要拿得到的話,要使用queue

val signal = async.signal[Boolean]
val signalChanges: Process[Task,Boolean] = signal.discrete

//Thread 1
signal.set(true).run // Time = 1
signal.set(true).run // Time = 2
signal.set(false).run //Time = 3
...
//Thread 2
signalChanges.map(x => {
  println("" + x + " -> " + System.currentTimeMillis)
  }).run.run
// Will print:
// true -> 1
// false -> 3

6. Process.awakeEvery定時產生Process的通知

import scala.concurrent.duration._
val clock = Process.awakeEvery(1 seconds)

Nest

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 2 BlogGist

Reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment