Skip to content

Instantly share code, notes, and snippets.

@0xYUANTI
Created May 8, 2017 11:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 0xYUANTI/c4dbface673e1a1dc9524f355631bb71 to your computer and use it in GitHub Desktop.
Save 0xYUANTI/c4dbface673e1a1dc9524f355631bb71 to your computer and use it in GitHub Desktop.
import fs2.interop.scalaz._
import fs2.{Scheduler, Strategy, time}
import scala.concurrent.duration._
import scalaz.concurrent.{Task => ZTask}
// porting some code from scalaz.concurrent.chan to fs2...
// we need to call a few external rest apis every N seconds and process the results.
object App {
def main(args: Array[String]): Unit = {
val prod1 = everyMinute evalMap (_ => producer1)
val prod2 = everyMinute evalMap (_ => producer2)
val prod3 = everyMinute evalMap (_ => producer3)
val stream = prod1 merge prod2 merge prod3 map processEvent
// main loop
stream.run.unsafePerformSync
}
def processEvent: Event => Unit = println
implicit val strategy = Strategy fromFixedDaemonPool 2
implicit val scheduler = Scheduler fromFixedDaemonPool 2
def everyMinute = time.awakeEvery[ZTask](10.seconds)
def producer1: ZTask[Event] = ZTask { println(java.lang.Thread.currentThread); A() }
def producer2: ZTask[Event] = ZTask { println(java.lang.Thread.currentThread); B() }
def producer3: ZTask[Event] = ZTask { println(java.lang.Thread.currentThread); C() }
sealed trait Event
final case class A() extends Event
final case class B() extends Event
final case class C() extends Event
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment