Skip to content

Instantly share code, notes, and snippets.

@chuwy
Created February 4, 2017 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chuwy/361d7c16615dc3c19d7d265217163cb9 to your computer and use it in GitHub Desktop.
Save chuwy/361d7c16615dc3c19d7d265217163cb9 to your computer and use it in GitHub Desktop.
Simple FS2 interval implementation
import fs2._
import scala.concurrent.duration._
/**
* Stream of signals with pauses
*/
object Interval {
private implicit val strategy = Strategy.fromFixedDaemonPool(3)
private implicit val scheduler = Scheduler.fromFixedDaemonPool(3)
/**
* Stream of actions with specified duration
*/
def interval[A](task: Task[A], delay: FiniteDuration): Stream[Task, A] = {
val delayedTask = task.schedule(delay)
def go(newDelay: FiniteDuration): Stream[Task, A] = {
Stream.eval(delayedTask) ++ go(delay)
}
go(delay)
}
/**
* Pull durations
*/
def unstableInterval(initial: Task[Int]): Stream[Task, Int] = {
Stream.eval(initial).flatMap { duration =>
val scheduled = Task.delay(duration).schedule(duration.seconds)
Stream.eval(scheduled) ++ unstableInterval(initial)
}
}
}
/**
* Outer object mock
*/
object DB {
private var counter = 0
def get: Task[Int] = {
Task.delay {
counter += 1
println(System.currentTimeMillis / 1000)
counter
}
}
}
object Main extends App {
Interval.interval(DB.get.map(println), 2.seconds).take(4).run.unsafeRun()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment