Skip to content

Instantly share code, notes, and snippets.

@jongwook
Last active January 2, 2016 02:19
Show Gist options
  • Save jongwook/8236438 to your computer and use it in GitHub Desktop.
Save jongwook/8236438 to your computer and use it in GitHub Desktop.
Code snippet - wrapping Apache Curator's APIs with RxJava
import static com.google.common.collect.Lists.transform;
import static java.nio.charset.StandardCharsets.UTF_8;
import static rx.Observable.from;
import static rx.Observable.merge;
public Observable<String> watch(String path) {
return Observable.create(observer -> {
AtomicBoolean unsubscribed = new AtomicBoolean(false);
BackgroundCallback callback = (client, event) -> {
switch (Code.get(event.getResultCode())) {
case OK:
byte[] bytes = event.getData();
if (bytes != null) {
observer.onNext(new String(bytes, UTF_8));
}
break;
case NONODE:
observer.onError(new NoNodeException(path));
}
};
Consumer<Watcher> watch = watcher -> {
try {
curator.getData().usingWatcher(watcher).inBackground(callback).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
};
watch.accept(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (unsubscribed.get()) return;
switch (event.getType()) {
case NodeCreated:
case NodeDataChanged:
watch.accept(this);
case NodeDeleted:
observer.onCompleted();
}
}
});
return Subscriptions.create(() -> unsubscribed.set(true));
});
}
public Observable<List<String>> watchChildren(String path) {
return Observable.create(observer -> {
AtomicBoolean unsubscribed = new AtomicBoolean(false);
BackgroundCallback callback = (client, event) -> {
switch (Code.get(event.getResultCode()) {
case OK:
List<String> children = event.getChildren();
observer.onNext(children != null ? children : Collections.emptyList());
break;
case NONODE:
observer.onError(new NoNodeException(path));
}
};
Consumer<Watcher> watch = watcher -> {
try {
curator.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
};
watch.accept(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (unsubscribed.get()) return;
switch (event.getType()) {
case NodeCreated:
case NodeChildrenChanged:
watch.accept(this);
break;
case NodeDeleted:
observer.onCompleted();
}
}
});
return Subscriptions.create(() -> unsubscribed.set(true));
});
}
public Observable<List<String>> getChildren(String path) {
return Observable.create(observer -> {
try {
curator.getChildren().inBackground((client, event) -> {
Code code = Code.get(event.getResultCode());
switch (code) {
case OK:
observer.onNext(event.getChildren());
observer.onCompleted();
break;
default:
observer.onError(KeeperException.create(code));
}
}).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
});
}
public Observable<Void> delete(String path) {
return Observable.create(observer -> {
try {
curator.delete().inBackground((client, event) -> {
Code code = Code.get(event.getResultCode());
switch (code) {
case OK:
case NONODE:
observer.onCompleted();
break;
default:
observer.onError(KeeperException.create(code));
}
}).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
});
}
public Observable<Void> deleteAll(String path) {
return getChildren(path).flatMap(children -> {
List<Observable<Void>> requests = transform(children, child ->
deleteAll(path + "/" + child)
);
return merge(from(requests)).lastOrDefault(null);
}).flatMap(done -> delete(path));
}
@jongwook
Copy link
Author

Thanks for the suggestion!

I updated deleteAll() using flatMap as you suggested; The Subscription was not cancellable anyways since Curator does not provide such method.

Also I wanted to keep the return type of getChildren() to be Observable<List<String>> to conform with watchChildren() which needs to produce a chunked list of children each time a new watched message arrives. So it makes a bit involved implementation using merge(from(....

I didn't know that I shouldn't call onCompleted in unsubscribe. Thanks for the information!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment