Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import cats.Eq
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
object Main {
implicit final class ObservableOps[A](private val self: Observable[A]) extends AnyVal {
def bubbleUpHot(a: A)(implicit Eq: Eq[A]): Observable[A] =
self.publishSelector { hot =>
val special = hot.find(_ === a)
val beforeSpecial = hot.takeWhile(_ =!= a)
val afterSpecial = hot.dropWhileInclusive(_ =!= a)
special ++ beforeSpecial ++ afterSpecial
}
def bubbleUpCold(a: A)(implicit Eq: Eq[A]): Observable[A] = {
val special = self.find(_ === a)
val beforeSpecial = self.takeWhile(_ =!= a)
val afterSpecial = self.dropWhileInclusive(_ =!= a)
special ++ beforeSpecial ++ afterSpecial
}
}
def main(args: Array[String]): Unit = {
val o = Observable(1, 2, 3, 4, 5, 6)
val h = o.bubbleUpHot(3)
val k = o.bubbleUpCold(3)
println("= hot ==========================================================")
h.foreach(i => println(s"i: $i"))
println("= cold =========================================================")
k.foreach(i => println(s"i: $i"))
//
// = hot ==========================================================
// i: 3
// i: 4
// i: 5
// i: 6
// = cold =========================================================
// i: 3
// i: 1
// i: 2
// i: 4
// i: 5
// i: 6
//
()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment