Skip to content

Instantly share code, notes, and snippets.

@jongwook
Last active January 2, 2016 02:19
Show Gist options
  • Save jongwook/8236438 to your computer and use it in GitHub Desktop.
Save jongwook/8236438 to your computer and use it in GitHub Desktop.
Code snippet - wrapping Apache Curator's APIs with RxJava
import static com.google.common.collect.Lists.transform;
import static java.nio.charset.StandardCharsets.UTF_8;
import static rx.Observable.from;
import static rx.Observable.merge;
public Observable<String> watch(String path) {
return Observable.create(observer -> {
AtomicBoolean unsubscribed = new AtomicBoolean(false);
BackgroundCallback callback = (client, event) -> {
switch (Code.get(event.getResultCode())) {
case OK:
byte[] bytes = event.getData();
if (bytes != null) {
observer.onNext(new String(bytes, UTF_8));
}
break;
case NONODE:
observer.onError(new NoNodeException(path));
}
};
Consumer<Watcher> watch = watcher -> {
try {
curator.getData().usingWatcher(watcher).inBackground(callback).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
};
watch.accept(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (unsubscribed.get()) return;
switch (event.getType()) {
case NodeCreated:
case NodeDataChanged:
watch.accept(this);
case NodeDeleted:
observer.onCompleted();
}
}
});
return Subscriptions.create(() -> unsubscribed.set(true));
});
}
public Observable<List<String>> watchChildren(String path) {
return Observable.create(observer -> {
AtomicBoolean unsubscribed = new AtomicBoolean(false);
BackgroundCallback callback = (client, event) -> {
switch (Code.get(event.getResultCode()) {
case OK:
List<String> children = event.getChildren();
observer.onNext(children != null ? children : Collections.emptyList());
break;
case NONODE:
observer.onError(new NoNodeException(path));
}
};
Consumer<Watcher> watch = watcher -> {
try {
curator.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
};
watch.accept(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (unsubscribed.get()) return;
switch (event.getType()) {
case NodeCreated:
case NodeChildrenChanged:
watch.accept(this);
break;
case NodeDeleted:
observer.onCompleted();
}
}
});
return Subscriptions.create(() -> unsubscribed.set(true));
});
}
public Observable<List<String>> getChildren(String path) {
return Observable.create(observer -> {
try {
curator.getChildren().inBackground((client, event) -> {
Code code = Code.get(event.getResultCode());
switch (code) {
case OK:
observer.onNext(event.getChildren());
observer.onCompleted();
break;
default:
observer.onError(KeeperException.create(code));
}
}).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
});
}
public Observable<Void> delete(String path) {
return Observable.create(observer -> {
try {
curator.delete().inBackground((client, event) -> {
Code code = Code.get(event.getResultCode());
switch (code) {
case OK:
case NONODE:
observer.onCompleted();
break;
default:
observer.onError(KeeperException.create(code));
}
}).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
});
}
public Observable<Void> deleteAll(String path) {
return getChildren(path).flatMap(children -> {
List<Observable<Void>> requests = transform(children, child ->
deleteAll(path + "/" + child)
);
return merge(from(requests)).lastOrDefault(null);
}).flatMap(done -> delete(path));
}
@zsxwing
Copy link

zsxwing commented Jan 6, 2014

L50, L96: Subscription.unsubscribe should not call onCompleted. When Subscription.unsubscribe is called, it means the caller is aware that the Observable will be canceled and expects that the Observer will receive nothing later. However, RxJava has SafeObserver to prevent from calling onCompleted in Subscription.unsubscribe, onCompleted will not be sent to your Observer actually.

@zsxwing
Copy link

zsxwing commented Jan 6, 2014

A better implementation of deleteAll:

 public Observable<Void> deleteAll(String path) {
        return getChildren(path).flatMap(child -> deleteAll(path + "/" + child))
               .lastOrDefault(null).flatMap(dummy -> delete(path));
    }

@zsxwing
Copy link

zsxwing commented Jan 6, 2014

I'm not sure if I understand curator correctly. In zookeeper, there is only one EventThread to handle the events in one zookeeper instance. I just glanced the codes in curator. Looks there is only one background thread to handle the background operation, too. That's perfect for RxJava. If not, there may be some concurrency issue.

@jongwook
Copy link
Author

Thanks for the suggestion!

I updated deleteAll() using flatMap as you suggested; The Subscription was not cancellable anyways since Curator does not provide such method.

Also I wanted to keep the return type of getChildren() to be Observable<List<String>> to conform with watchChildren() which needs to produce a chunked list of children each time a new watched message arrives. So it makes a bit involved implementation using merge(from(....

I didn't know that I shouldn't call onCompleted in unsubscribe. Thanks for the information!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment