Skip to content

Instantly share code, notes, and snippets.

@lburgazzoli
Created March 12, 2021 08:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lburgazzoli/1595583c133917f168eb08d2dbd2a64e to your computer and use it in GitHub Desktop.
Save lburgazzoli/1595583c133917f168eb08d2dbd2a64e to your computer and use it in GitHub Desktop.
private void watch(WebClient client, String token, String resourceVersion) {
String path = "/api/v1/namespaces/" + namespace + "/services";
JsonParser parser = JsonParser.newParser().objectValueMode()
.handler(event -> onChunk(event.objectValue()));
client.get(path)
.addQueryParam("watch", "true")
.addQueryParam("resourceVersion", resourceVersion)
.as(BodyCodec.pipe(new WriteStream<Buffer>() {
@Override
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}
@Override
public Future<Void> write(Buffer data) {
parser.write(data);
return Future.succeededFuture();
}
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
parser.write(data);
if (handler != null) {
handler.handle(Future.succeededFuture());
}
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
try {
parser.end();
} catch (Exception e) {
handler.handle(Future.failedFuture(e));
return;
}
handler.handle(Future.succeededFuture());
}
@Override
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
return this;
}
@Override
public boolean writeQueueFull() {
return false;
}
@Override
public WriteStream<Buffer> drainHandler(Handler<Void> handler) {
return this;
}
}))
.putHeader("Authorization", "Bearer " + token)
.send(ar -> {
if (ar.failed()) {
LOGGER.error("Unable to setup the watcher on the service list", ar.cause());
} else {
LOGGER.info("Watching services from namespace " + namespace);
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment