Last active
August 29, 2015 13:57
-
-
Save timyates/9624727 to your computer and use it in GitHub Desktop.
Messing with Groovy and RxJava's combineLatest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Grab( 'com.netflix.rxjava:rxjava-groovy:0.17.1' ) | |
import rx.* | |
import rx.schedulers.* | |
import java.util.concurrent.TimeUnit | |
// Emit the date every second | |
def date = Observable.create { observer -> | |
Schedulers.newThread().schedulePeriodically( { inner -> | |
observer.onNext( new Date() ) | |
}, 0, 1000, TimeUnit.MILLISECONDS ) | |
} | |
// Emit a UUID every 700ms | |
def values = Observable.create { observer -> | |
Schedulers.newThread().schedulePeriodically( { inner -> | |
observer.onNext( UUID.randomUUID() ) | |
}, 0, 700, TimeUnit.MILLISECONDS ) | |
} | |
// Emit a complete model whenever data arrives | |
Observable.combineLatest( date, values ) { d, v -> | |
[ date:d, uuid:v ] | |
}.take( 10 ).subscribe { println it } | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Requires com.netflix.rxjava:rxjava-core:0.17.1 | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.schedulers.Schedulers; | |
import java.util.Date; | |
import java.util.UUID; | |
import java.util.concurrent.TimeUnit; | |
public class Main { | |
public static void main(String[] args) { | |
// Emit the date every second | |
Observable<Date> date = Observable.create( (Subscriber<? super Date> subscriber ) -> | |
Schedulers.newThread().schedulePeriodically( (inner) -> | |
subscriber.onNext( new Date() ), 0, 1000, TimeUnit.MILLISECONDS ) ) ; | |
// Emit a UUID every 700ms | |
Observable<UUID> uuids = Observable.create( (Subscriber<? super UUID> subscriber ) -> | |
Schedulers.newThread().schedulePeriodically( (inner) -> | |
subscriber.onNext( UUID.randomUUID() ), 0, 700, TimeUnit.MILLISECONDS ) ) ; | |
// Emit a complete model whenever data arrives | |
Observable.combineLatest( date, uuids, Model::new ) | |
.take( 10 ) | |
.toBlockingObservable() | |
.forEach( System.out::println ) ; | |
} | |
private static class Model { | |
Date date ; | |
UUID uuid ; | |
public Model( Date date, UUID uuid ) { | |
this.date = date ; | |
this.uuid = uuid ; | |
} | |
public String toString() { | |
return String.format( "date:%s uuid:%s", date, uuid ) ; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment