Skip to content

Instantly share code, notes, and snippets.

@joecwu
Last active August 26, 2015 18:59
Show Gist options
  • Save joecwu/fd28b678585f89c746b0 to your computer and use it in GitHub Desktop.
Save joecwu/fd28b678585f89c746b0 to your computer and use it in GitHub Desktop.
[ScaVa->Scala] Scalaz Stream 串流好朋友 part 2
published tags
true
ScaVaToScala
Scala
Scalaz
Programming

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 2

這篇同步發佈在我的BlogGist

前情提要

請看[ScaVa->Scala] Scalaz Stream 串流好朋友 part 1Gist版本

怎麼使用Process?

使用Monad的功能map, flatMap

首先,Process是Monad,所以Monad使用上的特性也是Process好用的特色,因此map, flatMap…等這些Moand的特性都能使用。

case class Article(id:String,title:String,body:String)
val articles : Process[Task,Article] = Process(Article("1","Hi","Scalaz Process"))
//articles: scalaz.stream.Process[scalaz.concurrent.Task,Article] = Emit(WrappedArray(Article(1,Hi,Scalaz Process)))

scala> articles.map{println(_)}
Article(1,Hi,Scalaz Process)
//scalaz.stream.Process[scalaz.concurrent.Task,Unit] = Emit(Vector(()))

scala> articles.flatMap{ a=> Process(a.id) }.runLog.run
//IndexedSeq[String] = Vector(1)

Process.scan

在Stream的處理中,蠻常有機會是會要參考到前一個值,也就是causal function,也就是以之前的值與目前的值來處理的function,Process.scan就是Process中實作的causal function,他的使用方式就像是fold一樣,我們會定義一個function,傳入兩個參數,分別是之前運算完的結果,和當下的值。

val nums : Process[Task,Int] = Process.range(0,10)
// nums.runLog.run
// IndexedSeq[Int] = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

val sum = nums.scan(0)( (past,present) => past+present )
// sum.runLog.run
//IndexedSeq[Int] = Vector(0, 0, 1, 3, 6, 10, 15, 21, 28, 36, 45)

先代入起始值0scan當作第一個past,然後scan會將 (0,0), (0,1), (1,2), (3,3), (6,4)...分別代入(past,present) => past+present這個function中,所以最後的值就會是第一個0與所有的0到9的總合。

將Stream結合起來的好工具 - wye

要將Stream結合起來的作法有很多,這裡介紹一個叫作wye的工具,他裡面實作了許多能將Stream結合的各種function,像是merge, either, yip(這就像是zip一樣), unboundedQueue, boundedQueue, timedQueue

scala> val p1 : Process[Task,String] = Process("1","2","3","4")
scala> val p2 : Process[Task,String] = Process("a","b","c","d")

scala> p1.wye(p2)(wye.merge).runLog.run
res: IndexedSeq[String] = Vector(1, 2, 3, 4, a, b, c, d)

scala> p1.wye(p2)(wye.either).runLog.run
res: IndexedSeq[scalaz.\/[String,String]] = Vector(\/-(a), \/-(b), \/-(c), \/-(d), -\/(1), -\/(2), -\/(3), -\/(4))

scala> p1.wye(p2)(wye.yip).runLog.run
res: IndexedSeq[(String, String)] = Vector((1,a), (2,b), (3,c), (4,d))

其中有一個很實用的是dynamic,他的interface是def dynamic[I,I2](f: I => wye.Request, g: I2 => wye.Request): Wye[I,I2,ReceiveY[I,I2]],他讓你去決定當收到 左邊 或 右邊 的stream傳入的值的時候,要怎麼去處理,並且最後決定下一個是要拿 左邊(wye.Request.L) 或是 右邊(wye.Request.R)的。

// define the process logic for wye.dynamic, here is always change to the other side.
val w = wye.dynamic( (_:Any) => wye.Request.R, (_:Any) => wye.Request.L)

scala> p1.wye(p2)(w).runLog.run
res: IndexedSeq[scalaz.stream.ReceiveY[Any,Any]] = Vector(ReceiveL(1), ReceiveR(a), ReceiveL(2), ReceiveR(b), ReceiveL(3), ReceiveR(c), ReceiveL(4), ReceiveR(
d))

p1.wye(p2)(w).runLog.run.filter( _.isR )
res: IndexedSeq[scalaz.stream.ReceiveY[Any,Any]] = Vector(ReceiveR(a), ReceiveR(b), ReceiveR(c), ReceiveR(d))

scala> p1.wye(p2)(w).runLog.run.map(_ match {
     |       case ReceiveY.ReceiveR(x) => "R:"+x
     |       case ReceiveY.ReceiveL(x) => "L:"+x
     | }).map(x=>println(s"got $x"))

got L:1
got R:a
got L:2
got R:b
got L:3
got R:c
got L:4
got R:d

至於其他的unboundedQueue, boundedQueue, timedQueue…等別的用法,就請參考wye的原始碼嘍!

Reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment