Skip to content

Instantly share code, notes, and snippets.

@Qualtagh
Created September 22, 2015 13:00
Show Gist options
  • Save Qualtagh/4643ddcd5fc97bb8b0ea to your computer and use it in GitHub Desktop.
Save Qualtagh/4643ddcd5fc97bb8b0ea to your computer and use it in GitHub Desktop.
RxJavaTest
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.junit.Test;
import rx.Observable;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;
public class RxJavaTest {
@Test
public void sumLatestScore() throws Exception {
CountDownLatch latch = new CountDownLatch( 1 );
TestScheduler scheduler = new TestScheduler();
double totalValue[] = new double[ 1 ];
double expectedTotals[] = new double[]{
5, 4, 4, 4, 4,
6, 5, 8, 13, 13,
13, 13, 18, 18, 20,
19, 27, 31, 31, 31
};
Observable< Record > input = Observable.from( new Record[] {
new Record( "Alex", 0, 5 ),
new Record( "Alex", 1, 4 ),
new Record( "Ben", 5, 2 ),
new Record( "Alex", 6, 3 ),
new Record( "Alex", 7, 4 ),
new Record( "Ben", 7, 4 ),
new Record( "Alex", 8, 9 ),
new Record( "Katie", 12, 5 ),
new Record( "Ben", 14, 6 ),
new Record( "Alex", 15, 8 ),
new Record( "Katie", 16, 9 ),
new Record( "Ben", 16, 10 ),
new Record( "Alex", 17, 12 )
} )
.delay( e -> Observable.interval( e.timestamp, TimeUnit.SECONDS, scheduler ) )
.doOnCompleted( latch::countDown )
.share();
Observable< Observable< Record > > output = input
.groupBy( e -> e.player )
.map( g -> g.cache() )
.compose( new ToListOnNextTransformer() )
.map( eachPlayerRecordsList -> Observable.combineLatest( eachPlayerRecordsList, ( Object... eachPlayerRecordsArray ) -> {
double total = Arrays
.stream( eachPlayerRecordsArray )
.mapToDouble( e -> ( ( Record )e ).score )
.sum();
int timestamp = Arrays
.stream( eachPlayerRecordsArray )
.mapToInt( e -> ( ( Record )e ).timestamp )
.max()
.getAsInt();
return new Record( "Total", timestamp, total );
} ) );
Observable.switchOnNext( output )
.subscribe( g -> totalValue[ 0 ] = g.score, e -> { e.printStackTrace(); } );
for ( int timestamp = 0; timestamp < expectedTotals.length; timestamp++ ) {
scheduler.advanceTimeTo( timestamp, TimeUnit.SECONDS );
Assert.assertEquals( "At " + timestamp, expectedTotals[ timestamp ], totalValue[ 0 ], 0.0 );
}
latch.await();
}
private static class Record {
private final String player;
private final int timestamp;
private final double score;
public Record( String player, int timestamp, double score ) {
this.player = player;
this.timestamp = timestamp;
this.score = score;
}
@Override
public String toString() {
return player + " has " + score + " points at " + timestamp;
}
@Override
public int hashCode() {
return Objects.hashCode( player ) * 31 + timestamp;
}
@Override
public boolean equals( Object obj ) {
return obj instanceof Record && timestamp == ( ( Record )obj ).timestamp && Objects.equals( player, ( ( Record )obj ).player );
}
}
private static class ToListOnNextTransformer implements Observable.Transformer< Observable< Record >, List< Observable< Record > > > {
private final List< Observable< Record > > list = new ArrayList<>();
private final PublishSubject< List< Observable< Record > > > ret = PublishSubject.create();
@Override
public Observable< List< Observable< Record > > > call( Observable< Observable< Record > > eachPlayerRecords ) {
eachPlayerRecords.subscribe( playerRecords -> {
list.add( playerRecords );
ret.onNext( new ArrayList<>( list ) );
} );
return ret;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment