Created
April 21, 2016 19:34
-
-
Save nurkiewicz/8c3d867519b0a73fec21a3cf1168e329 to your computer and use it in GitHub Desktop.
Samples from live-coding session during Mix IT conference
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.nurkiewicz.mixit; | |
import lombok.Value; | |
import lombok.extern.slf4j.Slf4j; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.observers.TestSubscriber; | |
import rx.schedulers.Schedulers; | |
import rx.schedulers.TestScheduler; | |
import java.io.FileInputStream; | |
import java.io.InputStream; | |
import java.math.BigDecimal; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import static java.util.concurrent.TimeUnit.MILLISECONDS; | |
/** | |
* Samples from live-coding session during Mix IT conference. | |
* Not real test cases, just a bunch of examples. | |
*/ | |
@Slf4j | |
public class MixIT { | |
@Test | |
public void mixit_6() throws Exception { | |
//Backpressure! | |
Observable<String> obs = Observable | |
.create(subscriber -> { | |
subscriber.onNext("Mix IT " + System.nanoTime()); | |
subscriber.onCompleted(); | |
} | |
); | |
obs.subscribe( | |
string -> System.out.println(string) | |
); | |
obs.subscribe( | |
string -> System.out.println(string) | |
); | |
} | |
@Test | |
public void mixit_30() throws Exception { | |
Observable<BigDecimal> price = Observable.create(subscriber -> { | |
subscriber.onNext(BigDecimal.TEN); | |
subscriber.onError(new RuntimeException("abc")); | |
}); | |
price.subscribe( | |
bd -> System.out.println(bd), | |
ex -> log.error("Opps", ex) | |
); | |
} | |
@Test | |
public void mixit_47() throws Exception { | |
Observable<String> obs = Observable.create(subscriber -> { | |
subscriber.onNext("Mix IT"); | |
subscriber.onCompleted(); | |
}); | |
obs.subscribe(log::info); | |
log.info("Here"); | |
} | |
@Test | |
public void mixit_58() throws Exception { | |
Observable<InputStream> file = Observable.fromCallable(() -> | |
new FileInputStream("abc.txt")); | |
} | |
@Test | |
public void mixit_66() throws Exception { | |
List<String> strings = Arrays.asList("a@gmail.com", "b@gmail.com", "c@gmail.com"); | |
Observable<String> from = Observable.from(strings); | |
} | |
@Test | |
public void mixit_75() throws Exception { | |
Observable<Person> people = rxLoadPeople(); | |
log.info("After calling"); | |
people.subscribe(); | |
} | |
@Test | |
public void mixit_84() throws Exception { | |
Observable<Person> people = lazyLoadPeople(); | |
log.info("After calling"); | |
people.subscribe(this::log); | |
} | |
@Test | |
public void mixit_91() throws Exception { | |
Observable<Person> people = lazyLoadPeople(); | |
//.stream().map(Person::getId).collect(toList()) | |
Observable<List<Person>> peopleList = | |
people.toList(); | |
List<Person> list = peopleList | |
.toBlocking() | |
.first(); | |
} | |
@Test | |
public void mixit_104() throws Exception { | |
Observable<Person> people = lazyLoadPeople(); | |
Observable<Integer> ids = people | |
.map(Person::getId); | |
//nothing happens without subscribe | |
} | |
@Test | |
public void mixit_113() throws Exception { | |
//try-catch | |
unsafe() | |
.onErrorReturn(th -> new Person(-1)) | |
.subscribe( | |
person -> log(person), | |
th -> log.error("", th) | |
); | |
} | |
@Test | |
public void mixit_124() throws Exception { | |
unsafe() | |
.onErrorResumeNext(cachedPeople()) | |
.subscribe( | |
this::log, | |
th -> log.error("", th) | |
); | |
} | |
@Test | |
public void mixit_134() throws Exception { | |
slow(2000) | |
.toBlocking() | |
.subscribe(this::log); | |
} | |
@Test | |
public void mixit_143() throws Exception { | |
slow(600) | |
.timeout(500, MILLISECONDS) | |
.subscribe( | |
); | |
} | |
@Test | |
public void mixit_151() throws Exception { | |
Observable<Ticket> ticket1 = loadTicketPrimary(); | |
Observable<Ticket> ticket2 = loadTicketSecondary(); | |
Observable.merge(ticket1, ticket2); | |
ticket1 | |
.mergeWith(ticket2) | |
.first() | |
.subscribe(this::log); | |
} | |
@Test | |
public void mixit_163() throws Exception { | |
Person p1 = new Person(1); | |
Person p2 = new Person(2); | |
slowLoadTicket(p1); | |
slowLoadTicket(p2); | |
} | |
@Test | |
public void mixit_172() throws Exception { | |
Observable<Person> people = lazyLoadPeople(); | |
Observable<Ticket> tickets = people | |
.flatMap(this::slowLoadTicket); | |
} | |
@Test | |
public void mixit_180() throws Exception { | |
lazyLoadPeople() | |
.subscribeOn(Schedulers.io()); | |
} | |
@Test | |
public void mixit_187() throws Exception { | |
Observable | |
.interval(100, MILLISECONDS); | |
} | |
@Test | |
public void mixit_193() throws Exception { | |
TestScheduler scheduler = new TestScheduler(); | |
Observable<Long> timer = Observable | |
.timer(100, MILLISECONDS, scheduler); | |
TestSubscriber<Long> ts = new TestSubscriber<>(); | |
timer.subscribe(ts); | |
//wait an hour | |
scheduler.advanceTimeBy(99, MILLISECONDS); | |
ts.assertNoValues(); | |
scheduler.advanceTimeBy(1, MILLISECONDS); | |
ts.assertValue(0L); | |
} | |
Observable<Ticket> slowLoadTicket(Person p) { | |
return Observable.just(new Ticket(1)).delay(1000, MILLISECONDS); | |
} | |
Observable<Ticket> loadTicketPrimary() { | |
return Observable.just(new Ticket(1)).delay(100, MILLISECONDS); | |
} | |
Observable<Ticket> loadTicketSecondary() { | |
return Observable.just(new Ticket(1)).delay(100, MILLISECONDS); | |
} | |
Observable<Person> slow(int delay) { | |
return lazyLoadPeople() | |
.delay(delay, MILLISECONDS); | |
} | |
Observable<Person> cachedPeople() { | |
return lazyLoadPeople(); | |
} | |
Observable<Person> unsafe() { | |
if (Math.random() > 0.7) { | |
return Observable.error(new RuntimeException("Simulated")); | |
} | |
return lazyLoadPeople(); | |
} | |
void log(Object obj) { | |
log.info("Event: {}", obj); | |
} | |
Observable<Person> rxLoadPeople() { | |
return Observable.from(loadPeople()); | |
} | |
Observable<Person> lazyLoadPeople() { | |
return Observable.defer(() -> | |
Observable.from(loadPeople())); | |
} | |
List<Person> loadPeople() { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
log.info("Calls a DB"); | |
return Arrays.asList( | |
new Person(1), | |
new Person(2), | |
new Person(3) | |
); | |
} | |
} | |
@Value | |
class Person { | |
private final int id; | |
} | |
@Value | |
class Ticket { | |
private final int id; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment