Skip to content

Instantly share code, notes, and snippets.

@NiteshKant
Last active August 29, 2015 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NiteshKant/2cc70fa28a999ace8455 to your computer and use it in GitHub Desktop.
Save NiteshKant/2cc70fa28a999ace8455 to your computer and use it in GitHub Desktop.
Observable<WebSocketConnection> source = HttpClient.newClient()
.createGet("/ws")
.requestWebSocketUpgrade()
.doOnNext(resp -> logger.info(resp.toString()))
.map(WebSocketResponse::getWebSocketConnection)
.nest()
.lift(new OperatorCacheSingleWebsocketConnection());
Observable.interval(1, TimeUnit.SECONDS)
.map(aTick -> Observable.<WebSocketFrame>just(new MessageFrame(MessageType.Message,
msgIdGenerator.incrementAndGet())))
.flatMap(data -> source.flatMap(conn -> conn.write(data)))
.cast(MessageFrame.class)
.mergeWith(source.flatMap((webSocketConnection) -> webSocketConnection.getInput(false))
.filter(AcceptOnlyBinaryFramesFilter.INSTANCE)
.cast(BinaryWebSocketFrame.class)
.map(frm -> new MessageFrame(frm.content())))
.retryWhen(errStream -> errStream.flatMap(err -> {
if (err instanceof IOException) {
return Observable.timer(1, TimeUnit.SECONDS);
}
return Observable.error(err);
}))
.take(10)
.toBlocking()
.forEach(msgId -> logger.info("Received acknowledgment for message id => " + msgId.toString()));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment