Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Samples from live-coding session during Mix IT conference
package com.nurkiewicz.mixit;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import java.io.FileInputStream;
import java.io.InputStream;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* Samples from live-coding session during Mix IT conference.
* Not real test cases, just a bunch of examples.
*/
@Slf4j
public class MixIT {
@Test
public void mixit_6() throws Exception {
//Backpressure!
Observable<String> obs = Observable
.create(subscriber -> {
subscriber.onNext("Mix IT " + System.nanoTime());
subscriber.onCompleted();
}
);
obs.subscribe(
string -> System.out.println(string)
);
obs.subscribe(
string -> System.out.println(string)
);
}
@Test
public void mixit_30() throws Exception {
Observable<BigDecimal> price = Observable.create(subscriber -> {
subscriber.onNext(BigDecimal.TEN);
subscriber.onError(new RuntimeException("abc"));
});
price.subscribe(
bd -> System.out.println(bd),
ex -> log.error("Opps", ex)
);
}
@Test
public void mixit_47() throws Exception {
Observable<String> obs = Observable.create(subscriber -> {
subscriber.onNext("Mix IT");
subscriber.onCompleted();
});
obs.subscribe(log::info);
log.info("Here");
}
@Test
public void mixit_58() throws Exception {
Observable<InputStream> file = Observable.fromCallable(() ->
new FileInputStream("abc.txt"));
}
@Test
public void mixit_66() throws Exception {
List<String> strings = Arrays.asList("a@gmail.com", "b@gmail.com", "c@gmail.com");
Observable<String> from = Observable.from(strings);
}
@Test
public void mixit_75() throws Exception {
Observable<Person> people = rxLoadPeople();
log.info("After calling");
people.subscribe();
}
@Test
public void mixit_84() throws Exception {
Observable<Person> people = lazyLoadPeople();
log.info("After calling");
people.subscribe(this::log);
}
@Test
public void mixit_91() throws Exception {
Observable<Person> people = lazyLoadPeople();
//.stream().map(Person::getId).collect(toList())
Observable<List<Person>> peopleList =
people.toList();
List<Person> list = peopleList
.toBlocking()
.first();
}
@Test
public void mixit_104() throws Exception {
Observable<Person> people = lazyLoadPeople();
Observable<Integer> ids = people
.map(Person::getId);
//nothing happens without subscribe
}
@Test
public void mixit_113() throws Exception {
//try-catch
unsafe()
.onErrorReturn(th -> new Person(-1))
.subscribe(
person -> log(person),
th -> log.error("", th)
);
}
@Test
public void mixit_124() throws Exception {
unsafe()
.onErrorResumeNext(cachedPeople())
.subscribe(
this::log,
th -> log.error("", th)
);
}
@Test
public void mixit_134() throws Exception {
slow(2000)
.toBlocking()
.subscribe(this::log);
}
@Test
public void mixit_143() throws Exception {
slow(600)
.timeout(500, MILLISECONDS)
.subscribe(
);
}
@Test
public void mixit_151() throws Exception {
Observable<Ticket> ticket1 = loadTicketPrimary();
Observable<Ticket> ticket2 = loadTicketSecondary();
Observable.merge(ticket1, ticket2);
ticket1
.mergeWith(ticket2)
.first()
.subscribe(this::log);
}
@Test
public void mixit_163() throws Exception {
Person p1 = new Person(1);
Person p2 = new Person(2);
slowLoadTicket(p1);
slowLoadTicket(p2);
}
@Test
public void mixit_172() throws Exception {
Observable<Person> people = lazyLoadPeople();
Observable<Ticket> tickets = people
.flatMap(this::slowLoadTicket);
}
@Test
public void mixit_180() throws Exception {
lazyLoadPeople()
.subscribeOn(Schedulers.io());
}
@Test
public void mixit_187() throws Exception {
Observable
.interval(100, MILLISECONDS);
}
@Test
public void mixit_193() throws Exception {
TestScheduler scheduler = new TestScheduler();
Observable<Long> timer = Observable
.timer(100, MILLISECONDS, scheduler);
TestSubscriber<Long> ts = new TestSubscriber<>();
timer.subscribe(ts);
//wait an hour
scheduler.advanceTimeBy(99, MILLISECONDS);
ts.assertNoValues();
scheduler.advanceTimeBy(1, MILLISECONDS);
ts.assertValue(0L);
}
Observable<Ticket> slowLoadTicket(Person p) {
return Observable.just(new Ticket(1)).delay(1000, MILLISECONDS);
}
Observable<Ticket> loadTicketPrimary() {
return Observable.just(new Ticket(1)).delay(100, MILLISECONDS);
}
Observable<Ticket> loadTicketSecondary() {
return Observable.just(new Ticket(1)).delay(100, MILLISECONDS);
}
Observable<Person> slow(int delay) {
return lazyLoadPeople()
.delay(delay, MILLISECONDS);
}
Observable<Person> cachedPeople() {
return lazyLoadPeople();
}
Observable<Person> unsafe() {
if (Math.random() > 0.7) {
return Observable.error(new RuntimeException("Simulated"));
}
return lazyLoadPeople();
}
void log(Object obj) {
log.info("Event: {}", obj);
}
Observable<Person> rxLoadPeople() {
return Observable.from(loadPeople());
}
Observable<Person> lazyLoadPeople() {
return Observable.defer(() ->
Observable.from(loadPeople()));
}
List<Person> loadPeople() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("Calls a DB");
return Arrays.asList(
new Person(1),
new Person(2),
new Person(3)
);
}
}
@Value
class Person {
private final int id;
}
@Value
class Ticket {
private final int id;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment