Skip to content

Instantly share code, notes, and snippets.

@jongwook
Last active January 2, 2016 02:19
Show Gist options
  • Save jongwook/8236438 to your computer and use it in GitHub Desktop.
Save jongwook/8236438 to your computer and use it in GitHub Desktop.
Code snippet - wrapping Apache Curator's APIs with RxJava
import static com.google.common.collect.Lists.transform;
import static java.nio.charset.StandardCharsets.UTF_8;
import static rx.Observable.from;
import static rx.Observable.merge;
public Observable<String> watch(String path) {
return Observable.create(observer -> {
AtomicBoolean unsubscribed = new AtomicBoolean(false);
BackgroundCallback callback = (client, event) -> {
switch (Code.get(event.getResultCode())) {
case OK:
byte[] bytes = event.getData();
if (bytes != null) {
observer.onNext(new String(bytes, UTF_8));
}
break;
case NONODE:
observer.onError(new NoNodeException(path));
}
};
Consumer<Watcher> watch = watcher -> {
try {
curator.getData().usingWatcher(watcher).inBackground(callback).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
};
watch.accept(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (unsubscribed.get()) return;
switch (event.getType()) {
case NodeCreated:
case NodeDataChanged:
watch.accept(this);
case NodeDeleted:
observer.onCompleted();
}
}
});
return Subscriptions.create(() -> unsubscribed.set(true));
});
}
public Observable<List<String>> watchChildren(String path) {
return Observable.create(observer -> {
AtomicBoolean unsubscribed = new AtomicBoolean(false);
BackgroundCallback callback = (client, event) -> {
switch (Code.get(event.getResultCode()) {
case OK:
List<String> children = event.getChildren();
observer.onNext(children != null ? children : Collections.emptyList());
break;
case NONODE:
observer.onError(new NoNodeException(path));
}
};
Consumer<Watcher> watch = watcher -> {
try {
curator.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
};
watch.accept(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (unsubscribed.get()) return;
switch (event.getType()) {
case NodeCreated:
case NodeChildrenChanged:
watch.accept(this);
break;
case NodeDeleted:
observer.onCompleted();
}
}
});
return Subscriptions.create(() -> unsubscribed.set(true));
});
}
public Observable<List<String>> getChildren(String path) {
return Observable.create(observer -> {
try {
curator.getChildren().inBackground((client, event) -> {
Code code = Code.get(event.getResultCode());
switch (code) {
case OK:
observer.onNext(event.getChildren());
observer.onCompleted();
break;
default:
observer.onError(KeeperException.create(code));
}
}).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
});
}
public Observable<Void> delete(String path) {
return Observable.create(observer -> {
try {
curator.delete().inBackground((client, event) -> {
Code code = Code.get(event.getResultCode());
switch (code) {
case OK:
case NONODE:
observer.onCompleted();
break;
default:
observer.onError(KeeperException.create(code));
}
}).forPath(path);
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
});
}
public Observable<Void> deleteAll(String path) {
return getChildren(path).flatMap(children -> {
List<Observable<Void>> requests = transform(children, child ->
deleteAll(path + "/" + child)
);
return merge(from(requests)).lastOrDefault(null);
}).flatMap(done -> delete(path));
}
@zsxwing
Copy link

zsxwing commented Jan 6, 2014

I'm not sure if I understand curator correctly. In zookeeper, there is only one EventThread to handle the events in one zookeeper instance. I just glanced the codes in curator. Looks there is only one background thread to handle the background operation, too. That's perfect for RxJava. If not, there may be some concurrency issue.

@jongwook
Copy link
Author

Thanks for the suggestion!

I updated deleteAll() using flatMap as you suggested; The Subscription was not cancellable anyways since Curator does not provide such method.

Also I wanted to keep the return type of getChildren() to be Observable<List<String>> to conform with watchChildren() which needs to produce a chunked list of children each time a new watched message arrives. So it makes a bit involved implementation using merge(from(....

I didn't know that I shouldn't call onCompleted in unsubscribe. Thanks for the information!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment