Skip to content

Instantly share code, notes, and snippets.

@greghelton
Last active April 24, 2016 22:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save greghelton/9cd78637d81e47a81633894cbd802e1d to your computer and use it in GitHub Desktop.
Save greghelton/9cd78637d81e47a81633894cbd802e1d to your computer and use it in GitHub Desktop.
ReactiveSum2 - just trying to decipher the code of a more complex example
package com.reactive.summing;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;
import com.reactive.helpers.RxCreateObservable;
import rx.Observable;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
public class ReactiveSum2 {
ConnectableObservable<String> input;
private CountDownLatch latch = new CountDownLatch(1);
public static Observable<Double> varStream(
final String varName, Observable<String> input) {
final Pattern pattern = Pattern.compile("\\s*" + varName
+ "\\s*[:|=]\\s*(-?\\d+\\.?\\d*)");
return input.map(pattern::matcher)
.filter(matcher -> matcher.matches()
&& matcher.group(1) != null)
.map(matcher -> matcher.group(1))
.map(Double::parseDouble);
}
public void reactiveSum(Observable<Double> a, Observable<Double> b) {
Observable
.combineLatest(a.startWith(0.0), b.startWith(0.0), (x, y) -> x + y)
.subscribeOn(Schedulers.io())
.subscribe(
sum -> System.out.print("update : a + b = " + sum + "\n$ "),
error -> {
System.out.println("Got an error!");
error.printStackTrace();
}, () -> {
System.out.println("Exiting...");
latch.countDown();
});
}
public static void main( String[] args ) throws Exception {
ReactiveSum2 app = new ReactiveSum2();
app.input = RxCreateObservable.from(System.in);
Observable<Double> a = varStream("a", app.input);
Observable<Double> b = varStream("b", app.input);
app.reactiveSum(a,b);
app.input.connect();
}
}
C:\Users\gah285\dev\src\workspace-sts\testthis\target\classes>java -cp .;c:\users\gah285\.m2\repository\io\reactivex\rsjava\1.0.10\rxjava-1.0.10.jar;c:\users\gah285\.m2\repository\com\netflix\rxjava\rxjava-core\0.20.7\rxjava-core-0.20.7.jar com.reactive.summing.ReactiveSum2
update : a + b = 0.0
$ a=5
update : a + b = 5.0
$ b=3
update : a + b = 8.0
$ a=-2
update : a + b = 1.0
$ b=4
update : a + b = 2.0
$ exit
Exiting...
package com.reactive.helpers;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.file.DirectoryIteratorException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
public class RxCreateObservable {
public static Observable<String> from(final Path path) {
return Observable.<String>create(subscriber -> {
try {
BufferedReader reader = Files.newBufferedReader(path);
subscriber.add(Subscriptions.create(() -> {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}));
String line = null;
while ((line = reader.readLine()) != null && !subscriber.isUnsubscribed()) {
subscriber.onNext(line);
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch (IOException ioe) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(ioe);
}
}
});
}
public static ConnectableObservable<String> from(final InputStream stream) {
return from(new BufferedReader(new InputStreamReader(stream)));
}
public static ConnectableObservable<String> from(final BufferedReader reader) {
return Observable.create((Subscriber<? super String> subscriber) -> {
try {
String line;
if (subscriber.isUnsubscribed()) {
return;
}
while (!subscriber.isUnsubscribed() && (line = reader.readLine()) != null) {
if (line.equals("exit")) {
break;
}
subscriber.onNext(line);
}
} catch (IOException e) {
subscriber.onError(e);
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}).publish();
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.frontier.dpi</groupId>
<artifactId>testthis</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>testthis</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.10</version>
<scope />
</dependency>
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-core</artifactId>
<version>0.20.7</version>
</dependency>
</dependencies>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment