Created
September 22, 2015 13:00
-
-
Save Qualtagh/4643ddcd5fc97bb8b0ea to your computer and use it in GitHub Desktop.
RxJavaTest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.Objects; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import junit.framework.Assert; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.schedulers.TestScheduler; | |
import rx.subjects.PublishSubject; | |
public class RxJavaTest { | |
@Test | |
public void sumLatestScore() throws Exception { | |
CountDownLatch latch = new CountDownLatch( 1 ); | |
TestScheduler scheduler = new TestScheduler(); | |
double totalValue[] = new double[ 1 ]; | |
double expectedTotals[] = new double[]{ | |
5, 4, 4, 4, 4, | |
6, 5, 8, 13, 13, | |
13, 13, 18, 18, 20, | |
19, 27, 31, 31, 31 | |
}; | |
Observable< Record > input = Observable.from( new Record[] { | |
new Record( "Alex", 0, 5 ), | |
new Record( "Alex", 1, 4 ), | |
new Record( "Ben", 5, 2 ), | |
new Record( "Alex", 6, 3 ), | |
new Record( "Alex", 7, 4 ), | |
new Record( "Ben", 7, 4 ), | |
new Record( "Alex", 8, 9 ), | |
new Record( "Katie", 12, 5 ), | |
new Record( "Ben", 14, 6 ), | |
new Record( "Alex", 15, 8 ), | |
new Record( "Katie", 16, 9 ), | |
new Record( "Ben", 16, 10 ), | |
new Record( "Alex", 17, 12 ) | |
} ) | |
.delay( e -> Observable.interval( e.timestamp, TimeUnit.SECONDS, scheduler ) ) | |
.doOnCompleted( latch::countDown ) | |
.share(); | |
Observable< Observable< Record > > output = input | |
.groupBy( e -> e.player ) | |
.map( g -> g.cache() ) | |
.compose( new ToListOnNextTransformer() ) | |
.map( eachPlayerRecordsList -> Observable.combineLatest( eachPlayerRecordsList, ( Object... eachPlayerRecordsArray ) -> { | |
double total = Arrays | |
.stream( eachPlayerRecordsArray ) | |
.mapToDouble( e -> ( ( Record )e ).score ) | |
.sum(); | |
int timestamp = Arrays | |
.stream( eachPlayerRecordsArray ) | |
.mapToInt( e -> ( ( Record )e ).timestamp ) | |
.max() | |
.getAsInt(); | |
return new Record( "Total", timestamp, total ); | |
} ) ); | |
Observable.switchOnNext( output ) | |
.subscribe( g -> totalValue[ 0 ] = g.score, e -> { e.printStackTrace(); } ); | |
for ( int timestamp = 0; timestamp < expectedTotals.length; timestamp++ ) { | |
scheduler.advanceTimeTo( timestamp, TimeUnit.SECONDS ); | |
Assert.assertEquals( "At " + timestamp, expectedTotals[ timestamp ], totalValue[ 0 ], 0.0 ); | |
} | |
latch.await(); | |
} | |
private static class Record { | |
private final String player; | |
private final int timestamp; | |
private final double score; | |
public Record( String player, int timestamp, double score ) { | |
this.player = player; | |
this.timestamp = timestamp; | |
this.score = score; | |
} | |
@Override | |
public String toString() { | |
return player + " has " + score + " points at " + timestamp; | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hashCode( player ) * 31 + timestamp; | |
} | |
@Override | |
public boolean equals( Object obj ) { | |
return obj instanceof Record && timestamp == ( ( Record )obj ).timestamp && Objects.equals( player, ( ( Record )obj ).player ); | |
} | |
} | |
private static class ToListOnNextTransformer implements Observable.Transformer< Observable< Record >, List< Observable< Record > > > { | |
private final List< Observable< Record > > list = new ArrayList<>(); | |
private final PublishSubject< List< Observable< Record > > > ret = PublishSubject.create(); | |
@Override | |
public Observable< List< Observable< Record > > > call( Observable< Observable< Record > > eachPlayerRecords ) { | |
eachPlayerRecords.subscribe( playerRecords -> { | |
list.add( playerRecords ); | |
ret.onNext( new ArrayList<>( list ) ); | |
} ); | |
return ret; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment