Skip to content

Instantly share code, notes, and snippets.

@timyates
Created March 24, 2014 16:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save timyates/fd6904dcca366d50729c to your computer and use it in GitHub Desktop.
Save timyates/fd6904dcca366d50729c to your computer and use it in GitHub Desktop.
package sample;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.scene.Scene;
import javafx.scene.control.SplitPane;
import javafx.scene.control.TextArea;
import javafx.scene.web.WebEngine;
import javafx.scene.web.WebView;
import javafx.stage.Stage;
import rx.subjects.BehaviorSubject;
import static java.util.concurrent.TimeUnit.*;
public class Main extends Application {
@Override
public void start(Stage primaryStage) throws Exception{
TextArea textArea = new TextArea();
WebView webView = new WebView();
WebEngine engine = webView.getEngine();
BehaviorSubject<String> textSubject = BehaviorSubject.create( "" ) ;
textArea.textProperty().addListener( ( control, oldValue, newValue ) -> textSubject.onNext( newValue ) ) ;
textSubject.sample( 500, MILLISECONDS )
.distinctUntilChanged()
.subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ) ;
SplitPane root = new SplitPane();
root.getItems().addAll(textArea, webView);
Scene scene = new Scene(root);
primaryStage.setScene(scene);
primaryStage.show();
}
public static void main(String[] args) {
launch(args);
}
}
@TomasMikula
Copy link

Hi Tim,

I'm replying to your question "What does ReactFX give you over RxJava?"

ReactFX is designed specifically for JavaFX applications (or other single-threaded applications), hoping that it will be easier to use and more efficient in this limited scope than a general-purpose reactive framework like rxJava.

In this comment I will confine myself to comparison of these two code snippets:

rxJava

BehaviorSubject<String> textSubject = BehaviorSubject.create( "" ) ;
textArea.textProperty().addListener( ( control, oldValue, newValue ) -> textSubject.onNext( newValue ) ) ;
textSubject.sample( 500, MILLISECONDS )
        .distinctUntilChanged()
        .subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ) ;

ReactFX

EventStreams.valuesOf(textArea.textProperty())
        .reduceCloseSuccessions((a, b) -> b, Duration.ofMillis(500))
        .subscribe(html -> engine.loadContent(html));

Obviously, I'm biased, so I'm not going to jugde which one is more readable.

  1. First of all, these code snippets do not do exactly the same thing. If the user keeps typing for 10 seconds, the ReactFX version will emit only one event, 10.5 seconds after the first keystroke. The rxJava version will emit an event every 0.5 second.
  2. In the rxJava version, the subject keeps being sampled every 500ms even when the user is not typing at all, only for the sampled value to be discarded. In the ReactFX version, 500ms after the last input change there will be nothing left running in the background. Now, I don't know if this would lead to a leak in rxJava (sampling running indefinitely), or if the scheduler holds a weak reference to the Observable being sampled and cleans the scheduled action if it gets garbage collected. Anyway, if there are 20 instances of TextArea (think 20 open files in an IDE), there will be twenty samplings going on in the background, even though the user can be changing at most one of them at a time.
  3. Both examples need some kind of scheduler. I guess the rxJava version uses some rxJava default scheduler. The ReactFX version uses Timeline as a scheduler, so there is nothing instantiated you don't already have in a JavaFX application anyway.
  4. To stop observing textArea.textProperty() and release all intermediate streams/observables, you would have to change the ReactFX version like this
Subscription subscription = EventStreams.valuesOf(textArea.textProperty())
        .reduceCloseSuccessions((a, b) -> b, Duration.ofMillis(500))
        .subscribe(html -> engine.loadContent(html));
// ...
subscription.unsubscribe();

That is one more line. All intermediate streams in ReactFX are implemented such that when the last subscriber unsubscribes, they themselves unsubscribe from their input. This propagates all the way to textArea.textProperty(), causing the attached listener to be removed.

In the rxJava version, you would at least have to do

BehaviorSubject<String> textSubject = BehaviorSubject.create( "" ) ;
ChangeListener<String> listener = ( control, oldValue, newValue ) -> textSubject.onNext( newValue );
textArea.textProperty().addListener( listener ) ;
textSubject.sample( 500, MILLISECONDS )
        .distinctUntilChanged()
        .subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ) ;
// ...
textArea.textProperty().removeListener( listener ) ;

but that doesn't stop the periodic sampling of textSubject. Will the sampling stop at least on garbage collection (see 2.)?
5. As you already noticed, you don't have to wrap the callbacks in Platform.runLater().
6. Because ReactFX is mostly synchronous and not thread-safe, it should be more efficient than an asynchronous framework. Though no benchmarks have been performed.

Note that there are other important differences stemming from the synchronous nature of ReactFX, but they are not demonstrated in this example.

@timyates
Copy link
Author

Hiya! Thanks for the detailed answer! :-)

Yeah, my code above is wrong as you say, the corrected version would be:

BehaviorSubject<String> textSubject = BehaviorSubject.create( "" ) ;
textArea.textProperty().addListener( ( control, oldValue, newValue ) -> textSubject.onNext( newValue ) ) ;
textSubject.debounce( 500, MILLISECONDS )
           .distinctUntilChanged()
           .subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ) ;

Using debounce in place of sample.

To address the case of stopping observing a value, I believe this is required for rxjava:

BehaviorSubject<String> textSubject = BehaviorSubject.create( "" ) ;
ChangeListener<String> textListener = ( value, oldValue, newValue ) -> textSubject.onNext( newValue ) ;
textArea.textProperty().addListener( textListener ) ;
Subscription textAreaSub = textSubject.debounce( 500, MILLISECONDS )
                                      .distinctUntilChanged()
                                      .subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ) ;

Then, when you want to unsubscribe, you would need to do:

textArea.textProperty().removeListener( textListener );
textAreaSub.unsubscribe();

And that (as far as I understand it anyway) should pass the onCompleted event up the chain and stop any schedulers running. I need to dig a bit deeper than the "dabbling" stage with rxJava.

As you say, the ReactFX way is much cleaner and easier to understand :-)

Quick last question: Do you get a timeline per ReactFX Subscription (if required), or do all Subscriptions share a Timeline?

@TomasMikula
Copy link

Yeah, debounce() makes it behave the same and you don't even need distinctUntilChanged(), right?

I believe onCompleted has to originate from the source, not from the subscriber.

Every stream created by reduceCloseSuccessions has it's own Timeline, but I believe all the Timelines use the same scheduler internally.

@timyates
Copy link
Author

The distinctUntilChanged would filter out if someone types a word and then deletes the same word (within the debounce period). You don't get an element passing through as the new value is the same as the old one.

Any ideas if this variant would work better for unsubscription ?

Observable<String> textObserver = Observable.<String>create( subscriber -> {
    final ChangeListener<String> textListener = ( value, oldValue, newValue ) -> subscriber.onNext( newValue ) ;
    textArea.textProperty().addListener( textListener ) ;
    return Subscriptions.create( () -> {
        textArea.textProperty().removeListener( textListener ) ;
        System.out.println( "Listener removed" ) ;
    } );
} ).debounce( 500, MILLISECONDS ).distinctUntilChanged() ;
Subscription textAreaSub = textObserver.subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ;

Then:

textAreaSub.unsubscribe();

As before...

@TomasMikula
Copy link

Have you tried this code? Is "Listener removed" printed on textAreaSub.unsubscribe()?

Let me give names to the intermediate Observables for the sake of the text below:

Observable<String> textObserver = Observable.<String>create( subscriber -> {
    final ChangeListener<String> textListener = ( value, oldValue, newValue ) -> subscriber.onNext( newValue ) ;
    textArea.textProperty().addListener( textListener ) ;
    return Subscriptions.create( () -> {
        textArea.textProperty().removeListener( textListener ) ;
        System.out.println( "Listener removed" ) ;
    } );
} );
Observable<String> debounced = textObserver.debounce( 500, MILLISECONDS );
Observable<String> distinct = debounced.distinctUntilChanged() ;
Subscription textAreaSub = distinct.subscribe( ( s ) -> Platform.runLater( () -> engine.loadContent( s ) ) ;

textAreaSub.unsubscribe() clearly unsubscribes from distinct. Does this cause distinct to unsubscribe from debounced, provided that textAreaSub was the only subscriber to distinct? Does that in turn cause debounced to unsubscribe from textObserver, causing the textListener to be removed? I don't know the answer, but this is what ReactFX would do.

@guigarage
Copy link

I have written a short blogpost about this topic: http://www.guigarage.com/2014/03/reactive-programming-javafx/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment