Skip to content

Instantly share code, notes, and snippets.

@ValeriusGC
Created March 16, 2018 07:33
Show Gist options
  • Save ValeriusGC/0c611cdba2a5f6b019893cf8fc44d567 to your computer and use it in GitHub Desktop.
Save ValeriusGC/0c611cdba2a5f6b019893cf8fc44d567 to your computer and use it in GitHub Desktop.
Sample on RxJava2. Illustrates how to connect complex processes that call each other and to traverse event messages through.
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;
/**
* Замена вызовов лапши из колбеков на RxJava.
* <p>Задача - последовательное выполнение процессов.
* <p>{@link B} подписан на {@link A}, <b>main</b> подписан на {@link C}. Все выполняется в фоне в main.
* <ul>
* <li>Выполняется {@link A}
* <li>{@link B} ожидает завершения {@link A} и выполняется сам
* <li>После завершения {@link B} выполняется {@link C}
* <li>Результат выводится на консоль
* </ul>
* Фишка:
* <ul>
* <li>Все выполнение в фоне
* <li>Результат в главном потоке
* </ul>
*/
public class WithRx2 {
static class A {
Observable<String> process(final String s) {
// Иллюстрация выполнения в фоновом потоке
// System.out.println("thread: " + Thread.currentThread().getName());
return Observable.create(e -> {
// System.out.println("thread: " + Thread.currentThread().getName());
if (s == null) {
e.onError(new Exception());
}
// Основной цикл
String _s = s;
for (int i = 0; i < 3; ++i) {
Thread.sleep(100);
_s += "A";
e.onNext(_s);
}
// И завершение
e.onComplete();
});
}
}
static class B {
private A a = new A();
Observable<String> process(final String s) {
final String[] _s = {s};
// Задача - последовательное выполнение процессов. Сначала A.process(), потом B.process()
Observable<String> o = Observable.create(e -> {
// Иллюстрация выполнения в фоновом потоке
// System.out.println("thread: " + Thread.currentThread().getName());
if (_s[0] == null) {
e.onError(new Exception("String is null"));
}
// В первую очередь выполнится a.process().
Disposable d = a.process(s)
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(String _s1) {
// При помощи .onNext каждый некст переправится к эмиттеру.
e.onNext(_s1);
_s[0] = _s1;
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
d.dispose();
// Затем основной цикл
for (int i = 0; i < 3; ++i) {
Thread.sleep(100);
_s[0] += "B";
e.onNext(_s[0]);
}
// И завершение
e.onComplete();
});
return o;
}
}
static class C {
Observable<String> process(final String s) {
return Observable.create(e -> {
// Иллюстрация выполнения в фоновом потоке
// System.out.println("thread: " + Thread.currentThread().getName());
if (s == null) {
e.onError(new Exception("String is null"));
}
String _s = s;
for (int i = 0; i < 3; ++i) {
Thread.sleep(100);
_s += "C";
e.onNext(_s);
}
e.onComplete();
});
}
}
// ~Мир Rx
//------------------------------------------------------------------------------------------------------------------
public static void main(String[] args) {
final String[] s = {"string_"};
System.out.println("started with: " + s[0]);
// Задача - последовательное выполнение процессов.
Observable<String> observable = Observable.create(e -> {
// Метод выполняется первым и перебрасывает сообщения эмиттеру
new B()
.process(s[0])
.subscribe(e::onNext, e::onError);
// Метод выполняется вторым и перебрасывает сообщения эмиттеру.
// Завершение метода завершает все.
new C()
.process(s[0])
.subscribe(e::onNext, e::onError, e::onComplete);
});
observable
// Выполняем в фоне
.subscribeOn(Schedulers.io())
// Подписываемся в блокирующем (основном) потоке
.blockingSubscribe(new DisposableObserver<String>() {
@Override
public void onNext(String _s) {
// Иллюстрация выполнения в основном потоке
// System.out.println("thread: " + Thread.currentThread().getName());
System.out.println("onNext: " + _s);
s[0] = _s;
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError: " + throwable.getLocalizedMessage());
}
@Override
public void onComplete() {
// Иллюстрация выполнения в основном потоке
// System.out.println("thread: " + Thread.currentThread().getName());
System.out.println("onComplete: " + s[0]);
}
});
System.out.println("finished with: " + s[0]);
// На выходе - ожидаемые сообщения
// started with: string_
// onNext: string_A
// onNext: string_AA
// onNext: string_AAA
// onNext: string_AAAB
// onNext: string_AAABB
// onNext: string_AAABBB
// onNext: string_AAABBBC
// onNext: string_AAABBBCC
// onNext: string_AAABBBCCC
// onComplete: string_AAABBBCCC
// finished with: string_AAABBBCCC
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment