Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example for rx.Observable chaining
@Grab(group='io.reactivex', module='rxjava-reactive-streams', version='0.5.0')
import rx.Observable
def in1 = Observable.from([1, 3, 5]).map { a -> a - 1 }
def in2 = Observable.from([2, 4, 6]).map { a -> a - 1 }
def in3 = Observable.error(new Exception("BAM!"))
def sumTimes2 = Observable.zip(in1, in2) { a, b ->
a + b
}.map { a ->
2 * a
}
def multTimes2 = Observable.zip(in1, in2, in3) { a, b, c ->
a * b * c
}.map { a ->
2 * a
}
def emptyZip = Observable.zip([]) { -> println("this is never executed") }
sumTimes2.subscribe(
{ result -> println("sum: " + result) },
{ t -> println("sumE: " + t) }
)
multTimes2.subscribe(
{ result -> println("mult: " + result) },
{ t -> println("multE: " + t) }
)
/*
* output
*
* sum: 2
* sum: 10
* sum: 18
* multE: java.lang.Exception: BAM!
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.