-
-
Save timyates/fd6904dcca366d50729c to your computer and use it in GitHub Desktop.
package sample; | |
import javafx.application.Application; | |
import javafx.application.Platform; | |
import javafx.scene.Scene; | |
import javafx.scene.control.SplitPane; | |
import javafx.scene.control.TextArea; | |
import javafx.scene.web.WebEngine; | |
import javafx.scene.web.WebView; | |
import javafx.stage.Stage; | |
import rx.subjects.BehaviorSubject; | |
import static java.util.concurrent.TimeUnit.*; | |
public class Main extends Application { | |
@Override | |
public void start(Stage primaryStage) throws Exception{ | |
TextArea textArea = new TextArea(); | |
WebView webView = new WebView(); | |
WebEngine engine = webView.getEngine(); | |
BehaviorSubject<String> textSubject = BehaviorSubject.create( "" ) ; | |
textArea.textProperty().addListener( ( control, oldValue, newValue ) -> textSubject.onNext( newValue ) ) ; | |
textSubject.sample( 500, MILLISECONDS ) | |
.distinctUntilChanged() | |
.subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ) ; | |
SplitPane root = new SplitPane(); | |
root.getItems().addAll(textArea, webView); | |
Scene scene = new Scene(root); | |
primaryStage.setScene(scene); | |
primaryStage.show(); | |
} | |
public static void main(String[] args) { | |
launch(args); | |
} | |
} |
Hiya! Thanks for the detailed answer! :-)
Yeah, my code above is wrong as you say, the corrected version would be:
BehaviorSubject<String> textSubject = BehaviorSubject.create( "" ) ;
textArea.textProperty().addListener( ( control, oldValue, newValue ) -> textSubject.onNext( newValue ) ) ;
textSubject.debounce( 500, MILLISECONDS )
.distinctUntilChanged()
.subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ) ;
Using debounce
in place of sample
.
To address the case of stopping observing a value, I believe this is required for rxjava:
BehaviorSubject<String> textSubject = BehaviorSubject.create( "" ) ;
ChangeListener<String> textListener = ( value, oldValue, newValue ) -> textSubject.onNext( newValue ) ;
textArea.textProperty().addListener( textListener ) ;
Subscription textAreaSub = textSubject.debounce( 500, MILLISECONDS )
.distinctUntilChanged()
.subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ) ;
Then, when you want to unsubscribe, you would need to do:
textArea.textProperty().removeListener( textListener );
textAreaSub.unsubscribe();
And that (as far as I understand it anyway) should pass the onCompleted
event up the chain and stop any schedulers running. I need to dig a bit deeper than the "dabbling" stage with rxJava.
As you say, the ReactFX way is much cleaner and easier to understand :-)
Quick last question: Do you get a timeline per ReactFX Subscription (if required), or do all Subscriptions share a Timeline?
Yeah, debounce()
makes it behave the same and you don't even need distinctUntilChanged()
, right?
I believe onCompleted
has to originate from the source, not from the subscriber.
Every stream created by reduceCloseSuccessions
has it's own Timeline, but I believe all the Timelines use the same scheduler internally.
The distinctUntilChanged
would filter out if someone types a word and then deletes the same word (within the debounce
period). You don't get an element passing through as the new value is the same as the old one.
Any ideas if this variant would work better for unsubscription ?
Observable<String> textObserver = Observable.<String>create( subscriber -> {
final ChangeListener<String> textListener = ( value, oldValue, newValue ) -> subscriber.onNext( newValue ) ;
textArea.textProperty().addListener( textListener ) ;
return Subscriptions.create( () -> {
textArea.textProperty().removeListener( textListener ) ;
System.out.println( "Listener removed" ) ;
} );
} ).debounce( 500, MILLISECONDS ).distinctUntilChanged() ;
Subscription textAreaSub = textObserver.subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ;
Then:
textAreaSub.unsubscribe();
As before...
Have you tried this code? Is "Listener removed" printed on textAreaSub.unsubscribe()
?
Let me give names to the intermediate Observables for the sake of the text below:
Observable<String> textObserver = Observable.<String>create( subscriber -> {
final ChangeListener<String> textListener = ( value, oldValue, newValue ) -> subscriber.onNext( newValue ) ;
textArea.textProperty().addListener( textListener ) ;
return Subscriptions.create( () -> {
textArea.textProperty().removeListener( textListener ) ;
System.out.println( "Listener removed" ) ;
} );
} );
Observable<String> debounced = textObserver.debounce( 500, MILLISECONDS );
Observable<String> distinct = debounced.distinctUntilChanged() ;
Subscription textAreaSub = distinct.subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ;
textAreaSub.unsubscribe()
clearly unsubscribes from distinct
. Does this cause distinct
to unsubscribe from debounced
, provided that textAreaSub
was the only subscriber to distinct
? Does that in turn cause debounced
to unsubscribe from textObserver
, causing the textListener
to be removed? I don't know the answer, but this is what ReactFX would do.
I have written a short blogpost about this topic: http://www.guigarage.com/2014/03/reactive-programming-javafx/
Hi Tim,
I'm replying to your question "What does ReactFX give you over RxJava?"
ReactFX is designed specifically for JavaFX applications (or other single-threaded applications), hoping that it will be easier to use and more efficient in this limited scope than a general-purpose reactive framework like rxJava.
In this comment I will confine myself to comparison of these two code snippets:
rxJava
ReactFX
Obviously, I'm biased, so I'm not going to jugde which one is more readable.
textArea.textProperty()
and release all intermediate streams/observables, you would have to change the ReactFX version like thisThat is one more line. All intermediate streams in ReactFX are implemented such that when the last subscriber unsubscribes, they themselves unsubscribe from their input. This propagates all the way to
textArea.textProperty()
, causing the attached listener to be removed.In the rxJava version, you would at least have to do
but that doesn't stop the periodic sampling of
textSubject
. Will the sampling stop at least on garbage collection (see 2.)?5. As you already noticed, you don't have to wrap the callbacks in Platform.runLater().
6. Because ReactFX is mostly synchronous and not thread-safe, it should be more efficient than an asynchronous framework. Though no benchmarks have been performed.
Note that there are other important differences stemming from the synchronous nature of ReactFX, but they are not demonstrated in this example.