Last active
April 24, 2016 22:24
-
-
Save greghelton/9cd78637d81e47a81633894cbd802e1d to your computer and use it in GitHub Desktop.
ReactiveSum2 - just trying to decipher the code of a more complex example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.reactive.summing; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.regex.Pattern; | |
import com.reactive.helpers.RxCreateObservable; | |
import rx.Observable; | |
import rx.observables.ConnectableObservable; | |
import rx.schedulers.Schedulers; | |
public class ReactiveSum2 { | |
ConnectableObservable<String> input; | |
private CountDownLatch latch = new CountDownLatch(1); | |
public static Observable<Double> varStream( | |
final String varName, Observable<String> input) { | |
final Pattern pattern = Pattern.compile("\\s*" + varName | |
+ "\\s*[:|=]\\s*(-?\\d+\\.?\\d*)"); | |
return input.map(pattern::matcher) | |
.filter(matcher -> matcher.matches() | |
&& matcher.group(1) != null) | |
.map(matcher -> matcher.group(1)) | |
.map(Double::parseDouble); | |
} | |
public void reactiveSum(Observable<Double> a, Observable<Double> b) { | |
Observable | |
.combineLatest(a.startWith(0.0), b.startWith(0.0), (x, y) -> x + y) | |
.subscribeOn(Schedulers.io()) | |
.subscribe( | |
sum -> System.out.print("update : a + b = " + sum + "\n$ "), | |
error -> { | |
System.out.println("Got an error!"); | |
error.printStackTrace(); | |
}, () -> { | |
System.out.println("Exiting..."); | |
latch.countDown(); | |
}); | |
} | |
public static void main( String[] args ) throws Exception { | |
ReactiveSum2 app = new ReactiveSum2(); | |
app.input = RxCreateObservable.from(System.in); | |
Observable<Double> a = varStream("a", app.input); | |
Observable<Double> b = varStream("b", app.input); | |
app.reactiveSum(a,b); | |
app.input.connect(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
C:\Users\gah285\dev\src\workspace-sts\testthis\target\classes>java -cp .;c:\users\gah285\.m2\repository\io\reactivex\rsjava\1.0.10\rxjava-1.0.10.jar;c:\users\gah285\.m2\repository\com\netflix\rxjava\rxjava-core\0.20.7\rxjava-core-0.20.7.jar com.reactive.summing.ReactiveSum2 | |
update : a + b = 0.0 | |
$ a=5 | |
update : a + b = 5.0 | |
$ b=3 | |
update : a + b = 8.0 | |
$ a=-2 | |
update : a + b = 1.0 | |
$ b=4 | |
update : a + b = 2.0 | |
$ exit | |
Exiting... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.reactive.helpers; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.io.Reader; | |
import java.nio.file.DirectoryIteratorException; | |
import java.nio.file.DirectoryStream; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import rx.Observable; | |
import rx.Scheduler; | |
import rx.Subscriber; | |
import rx.observables.ConnectableObservable; | |
import rx.subscriptions.Subscriptions; | |
public class RxCreateObservable { | |
public static Observable<String> from(final Path path) { | |
return Observable.<String>create(subscriber -> { | |
try { | |
BufferedReader reader = Files.newBufferedReader(path); | |
subscriber.add(Subscriptions.create(() -> { | |
try { | |
reader.close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
})); | |
String line = null; | |
while ((line = reader.readLine()) != null && !subscriber.isUnsubscribed()) { | |
subscriber.onNext(line); | |
} | |
if (!subscriber.isUnsubscribed()) { | |
subscriber.onCompleted(); | |
} | |
} catch (IOException ioe) { | |
if (!subscriber.isUnsubscribed()) { | |
subscriber.onError(ioe); | |
} | |
} | |
}); | |
} | |
public static ConnectableObservable<String> from(final InputStream stream) { | |
return from(new BufferedReader(new InputStreamReader(stream))); | |
} | |
public static ConnectableObservable<String> from(final BufferedReader reader) { | |
return Observable.create((Subscriber<? super String> subscriber) -> { | |
try { | |
String line; | |
if (subscriber.isUnsubscribed()) { | |
return; | |
} | |
while (!subscriber.isUnsubscribed() && (line = reader.readLine()) != null) { | |
if (line.equals("exit")) { | |
break; | |
} | |
subscriber.onNext(line); | |
} | |
} catch (IOException e) { | |
subscriber.onError(e); | |
} | |
if (!subscriber.isUnsubscribed()) { | |
subscriber.onCompleted(); | |
} | |
}).publish(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.frontier.dpi</groupId> | |
<artifactId>testthis</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<packaging>jar</packaging> | |
<name>testthis</name> | |
<url>http://maven.apache.org</url> | |
<properties> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>io.reactivex</groupId> | |
<artifactId>rxjava</artifactId> | |
<version>1.0.10</version> | |
<scope /> | |
</dependency> | |
<dependency> | |
<groupId>com.netflix.rxjava</groupId> | |
<artifactId>rxjava-core</artifactId> | |
<version>0.20.7</version> | |
</dependency> | |
</dependencies> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment