Skip to content

Instantly share code, notes, and snippets.

@ahmed3elshaer
Last active December 14, 2018 15:25
Show Gist options
  • Save ahmed3elshaer/50bbec10cd18f1222fd5bea9f13af864 to your computer and use it in GitHub Desktop.
Save ahmed3elshaer/50bbec10cd18f1222fd5bea9f13af864 to your computer and use it in GitHub Desktop.
// try to get the first excuted observable from two identical ones
Observable.amb(getCustomerFromServerOne(), getCustomerFromServerTwo())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Action0() {
@Override
public void call() {
v.setEnabled(false);
}
})
.subscribe(new Subscriber<Customer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.d("AMB", e.getMessage(), e);
}
@Override
public void onNext(Customer customer) {
Log.d("AMB", customer.getName());
results.setText(results.getText() + "\n" + customer.getName());
unsubscribe();
v.setEnabled(true);
}
});
//we use fromCallable method to run expensive method inside and return an observable to subscribe on
Observable.fromCallable(Database::readValue)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> {
resultTextView.setText(result);
hideProgress();
});
// With Strings
val disposable1 =
Observable.just("One Fish", "Two Fish", "Red Fish", "Blue Fish")
.subscribe({ item ->
// onNext
content.text = "${content.text}\n$item"
Timber.d("Item: $item, Time: ${System.currentTimeMillis()}")
})
compositeDisposable.add(disposable1)
// With integers
val disposable2 =
Observable.just(1, 2, 3, 4)
.subscribe({ item ->
// onNext
content.text = "${content.text}\n$item"
Timber.d("Item: $item, Time: ${System.currentTimeMillis()}")
})
compositeDisposable.add(disposable2)
// With a complex object
val fooSquare = FavoriteShape("Foo", FunShape("Red", "Square", 4))
val barCircle = FavoriteShape("Bar", FunShape("Orange", "Circle", 0))
val fizRectangle = FavoriteShape("Fiz", FunShape("Purple", "Rectangle", 4))
val binTriangle = FavoriteShape("Bin", FunShape("Blue", "Triangle", 3))
val disposable3 =
Observable.just(fooSquare, barCircle, fizRectangle, binTriangle)
.subscribe({ item ->
// onNext
// .toString() is automatically called here, building the output
// you see on the screen.
content.text = "${content.text}\n$item"
Timber.d("Item: $item, Time: ${System.currentTimeMillis()}")
}, { ex ->
// onError
Timber.e(ex, ex.message)
}, {
// onComplete
Timber.i("onComplete")
})
compositeDisposable.add(disposable3)
// .just with a List
// listOf gives us an immutable List<T>
val listOne = listOf(2, 4, 8, 16)
val listTwo = listOf(3, 6, 12, 24)
val disposable4 = Observable.just(listOne, listTwo)
.subscribe({ item ->
// onNext
content.text = "${content.text}\n$item"
Timber.d("Item: $item, Time: ${System.currentTimeMillis()}")
})
compositeDisposable.add(disposable4)
Observable
//creating a callable to perform our observer task
.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
wait(5000);
return "Hello from a callable";
}
})
// the thead the observable would subscribe on
.subscribeOn(Schedulers.computation())
//switch to another thread (IO) to save to the DB
.observeOn(Schedulers.io())
// using doOnNext after the IO Thread to excute saving to the DB Async after the computation
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
saveToDB();
}
})
//switching to observe on the MainThread to show the string
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(getActivity(), s, Toast.LENGTH_LONG).show();
}
}
);
// function to get the observable
public Observable<Gist> getGistObservable(){
//we are using defer so that the getGist is just called when we have a subscriber
return Observable.defer(
new Func0<Observable<Gist>>() {
@Override
public Observable<Gist> call() {
try {
// here we are calling the getGist function to be excuted on a subscriber
return Observable.just(getGist());
} catch (IOException e) {
return Observable.error(e);
}
}
}
);
}
//excuting the http call and get the response
@Nullable
private Gist getGist() throws IOException {
OkHttpClient client = new OkHttpClient();
// Go get this Gist: https://gist.github.com/donnfelker/db72a05cc03ef523ee74
// via the GitHub API
Request request = new Request.Builder()
.url("https://api.github.com/gists/db72a05cc03ef523ee74")
.build();
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
Gist gist = new Gson().fromJson(response.body().charStream(), Gist.class);
return gist;
}
return null;
}
// subscribing on an observable
subscription = getGistObservable() // getting the observable object
.subscribeOn(Schedulers.io()) // subscribe on the IO Background thread to let the task excuted on this thread
.observeOn(AndroidSchedulers.mainThread()) // get the results (callbacks) on the Android MainThread
.subscribe(new Subscriber<Gist>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//if the task has an error
Log.e(TAG, e.getMessage(), e);
}
@Override
public void onNext(Gist gist) {
//if we got a result from the task
StringBuilder sb = new StringBuilder();
// Output
for (Map.Entry<String, GistFile> entry : gist.files.entrySet()) {
sb.append(entry.getKey());
sb.append(" - ");
sb.append("Length of file ");
sb.append(entry.getValue().content.length());
sb.append("\n");
}
TextView text = (TextView) getView().findViewById(R.id.main_message);
text.setText(sb.toString());
}
});
//unsubscribe from the observer to prevent the memory leak
@Override
public void onDestroy() {
super.onDestroy();
if(subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
}
}
// we are using defer to hold the excution on the expensive method until observable subscribes
Observable.defer(() -> Observable.just(Database.readValue()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> {
hideProgress();
resultTextView.setText(result);
});
disposableObserver = Observable.just(5, 6, 7, 8)
.subscribeWith(object: DisposableObserver<Int>() {
override fun onError(e: Throwable) {
Timber.e(e, e?.message)
}
override fun onComplete() {
Timber.i("onComplete disposable observer")
}
override fun onNext(item: Int) {
content.text = "${content.text}\n$item"
Timber.d("onNext: $item ")
}
})
Observable.zip(getCustomer(), getOrder(), new Func2<Customer, Order, CustomerOrderInfo>() {
@Override
public CustomerOrderInfo call(Customer customer, Order order) {
return new CustomerOrderInfo(customer.getId(), order.getId());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<CustomerOrderInfo>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(CustomerOrderInfo customerOrderInfo) {
Log.d("RX", "Got the customer info");
}
});
public Observable<Customer> getCustomer() {
return Observable.defer(new Func0<Observable<Customer>>() {
@Override
public Observable<Customer> call() {
return Observable.just(new Customer("Foo Bar", UUID.randomUUID().toString()));
}
});
}
public Observable<Order> getOrder() {
return Observable.defer(new Func0<Observable<Order>>() {
@Override
public Observable<Order> call() {
return Observable.just(new Order(UUID.randomUUID().toString(), 10000));
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment