Skip to content

Instantly share code, notes, and snippets.

@duarten
Created January 16, 2014 22:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save duarten/8465001 to your computer and use it in GitHub Desktop.
Save duarten/8465001 to your computer and use it in GitHub Desktop.
import java.util.{Timer, TimerTask}
import scala.concurrent.duration._
import rx.lang.scala.{Observable, Observer, Subscription, Scheduler}
import rx.lang.scala.observables.ConnectableObservable
import rx.lang.scala.subscriptions.{CompositeSubscription, MultipleAssignmentSubscription}
import rx.lang.scala.schedulers.NewThreadScheduler
object RxExplorations{
implicit class SchedulerOps(val s: Scheduler) extends AnyVal {
def mySchedule(work: (=>Unit) => Unit): Subscription =
s.schedule(scheduler => {
val subscription = MultipleAssignmentSubscription()
def loop(): Unit = {
subscription.subscription = scheduler.schedule {
work { loop() }
}
}
loop()
subscription
})
}
implicit class ObservableObjectOps(val o: Observable.type) extends AnyVal {
def myFrom[A](xs: Iterable[A]): Observable[A] =
Observable.create(observer => {
xs foreach (observer.onNext _)
observer.onCompleted()
Subscription { }
})
def myFrom2[A](xs: Iterable[A], s: Scheduler): Observable[A] =
Observable.create(observer => {
s schedule {
xs foreach (observer.onNext _)
observer.onCompleted()
}
})
def myFrom3[A](xs: Iterable[A], s: Scheduler): Observable[A] =
Observable.create(observer => {
val it = xs.iterator
s.scheduleRec(self => {
if (it.hasNext) {
observer.onNext(it.next())
self
}
else observer.onCompleted()
})
})
def myFrom4[A](xs: Iterable[A], s: Scheduler): Observable[A] =
Observable.create(observer => {
val it = xs.iterator
s.mySchedule(self => {
if (it.hasNext) {
observer.onNext(it.next())
self
}
else observer.onCompleted()
})
})
def myInterval(d: Duration): Observable[Long] = {
val t = new Timer(true)
Observable.create(observer => {
@volatile var done = false;
var c = 0l
def tt(): TimerTask = new TimerTask() {
def run() {
if (done) {
observer.onCompleted()
return
}
observer.onNext(c)
c += 1
t.schedule(tt(), d.toMillis)
}
}
t.schedule(tt(), d.toMillis)
Subscription {
done = true;
}
})
}
}
implicit class ObservableOps[A](val o: Observable[A]) extends AnyVal {
def myTake(n: Int): Observable[A] =
Observable.create(observer => {
if (n < 1) {
o.subscribe().unsubscribe()
observer.onCompleted()
Subscription { }
}
else {
var c = n
var s = CompositeSubscription()
s += o.subscribe(
((a: A) => {
if (c > 0) {
observer.onNext(a)
c -= 1
if (c == 0) {
s.unsubscribe()
observer.onCompleted()
}
}
}),
observer.onError _,
observer.onCompleted _)
s
}
})
def myFilter(p: A => Boolean): Observable[A] =
Observable.create(observer => {
o.subscribe(
(a: A) => {
if (p(a))
observer.onNext(a)
},
observer.onError _,
observer.onCompleted _)
})
def myAny(): Observable[Boolean] =
Observable.create(observer => {
var s = CompositeSubscription()
var hasReturned = false
s += o.subscribe(
(_: A) => {
if (!hasReturned) {
hasReturned = true
observer.onNext(true)
observer.onCompleted()
s.unsubscribe
}
},
observer.onError _,
() => {
if (!hasReturned) {
observer.onNext(false)
observer.onCompleted()
}
})
})
def myFoldLeft[B](z: B)(f: (B, A) => B): Observable[B] =
Observable.create(observer => {
var acc = z
o.subscribe(
(a: A) => acc = f(acc, a),
observer.onError _,
() => {
observer.onNext(acc)
observer.onCompleted()
})
})
def myMerge(other: Observable[A]): Observable[A] =
Observable.create(observer => {
var done = false
val s = CompositeSubscription()
val obs = Observer(
(a: A) => {
synchronized {
if (!done) {
observer.onNext(a)
}
}
},
t => {
synchronized {
if (!done) {
done = true
s.unsubscribe
observer.onError(t)
}
}
},
() => {
synchronized {
if (!done) {
done = true
s.unsubscribe
observer.onCompleted()
}
}
}
)
s += o.subscribe(obs)
s += other.subscribe(obs)
s
})
}
def testPublish() {
val o = Observable.interval(200 millis)
val c: ConnectableObservable[Long] = o.publish
c.subscribe(x => println(x))
val s = c.connect
Thread.sleep(200)
c.subscribe(x => println(x))
Thread.sleep(1000)
s.unsubscribe
}
def main(args: Array[String]) {
//val o = Observable.interval(200 millis).myTake(5)
//val o = Observable.interval(200 millis).myFilter(_ % 2 == 0).myTake(5)
//val o = Observable.interval(200 millis).myFilter(_ % 2 == 0).myAny()
//val o = Observable.interval(200 millis).myFilter(_ % 2 == 0).myTake(5).myFoldLeft(0l)(_ + _)
//val o = Observable.interval(200 millis).myFilter(_ % 2 == 0).myTake(5).myFoldLeft(0)((acc, _) => acc + 1)
//val o = Observable.interval(250 millis).myMerge(Observable.interval(200 millis)).take(5)
//o.toBlockingObservable.foreach(n => println("n = " + n))
val s = Observable.myFrom4(Stream.from(1), NewThreadScheduler()).subscribe(x => println(x))
Thread.sleep(200)
s.unsubscribe
Thread.sleep(500)
testPublish
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment