Skip to content

Instantly share code, notes, and snippets.

@chbatey
Last active February 16, 2016 09:56
Show Gist options
  • Save chbatey/0a0d6f5ad34a548abb04 to your computer and use it in GitHub Desktop.
Save chbatey/0a0d6f5ad34a548abb04 to your computer and use it in GitHub Desktop.
package info.batey.examples.observables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import info.batey.examples.cf.CompletableFutureTest;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.observable.ListenableFutureObservable;
import rx.schedulers.Schedulers;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ObservablesTest {
private static Logger LOG = LoggerFactory.getLogger(CompletableFutureTest.class);
private static ExecutorService executor = Executors.newSingleThreadExecutor();
private static ListeningExecutorService lExcutor = MoreExecutors.listeningDecorator(executor);
@Test
public void creation() throws Exception {
List<String> names = Arrays.asList("Chris", "Alex");
Observable<String> obs = Observable.from(names);
obs.subscribe(s -> LOG.info(s));
}
@Test
public void blocking() throws Exception {
Future<?> future = executor.submit((Runnable) () -> {
fiveSeconds();
});
LOG.info("Creating an Observable");
Observable<Object> observable = Observable.from(future);
LOG.info("Created an Observable");
observable.subscribe(o -> {
LOG.info("Observable has delivered");
});
LOG.info("Finished subscribing");
observable.toBlocking().first();
LOG.info("Observable done");
}
@Test
public void notSoBlockingButIStillUseAThread() throws Exception {
Future<?> future = executor.submit(this::fiveSeconds);
LOG.info("Creating an Observable");
Observable<Object> observable = Observable.from(future, Schedulers.io());
LOG.info("Created an Observable");
observable.subscribe(o -> {
LOG.info("Observable has delivered");
});
LOG.info("Finished subscribing");
observable.toBlocking().first();
LOG.info("Observable done");
}
// @Test
// public void listeNotSoBlocking() throws Exception {
// ListenableFuture<?> future = lExcutor.submit(this::fiveSeconds);
// LOG.info("Creating an Observable");
// Observable<?> observable = ListenableFutureObservable.from(future, executor);
// LOG.info("Created an Observable");
// observable.subscribe(o -> {
// LOG.info("Observable has delivered");
// });
// LOG.info("Finished subscribing");
// observable.toBlocking().first();
// LOG.info("Observable done");
// }
private void fiveSeconds() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void block(Observable<?> o) {
try {
o.toBlocking().toFuture().get();
} catch (Exception e) {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment