Skip to content

Instantly share code, notes, and snippets.

@daschl
Created February 24, 2014 10:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daschl/9185619 to your computer and use it in GitHub Desktop.
Save daschl/9185619 to your computer and use it in GitHub Desktop.
private final Queue<Event<CouchbaseRequest>> queue = new ArrayDeque<Event<CouchbaseRequest>>();
final class EventResponseDecoder extends MessageToMessageDecoder<CouchbaseResponse> {
@Override
protected void decode(ChannelHandlerContext ctx, CouchbaseResponse in, List<Object> out) throws Exception {
Event<CouchbaseRequest> event = queue.poll();
((Deferred) event.getReplyTo()).accept(in);
}
}
final class EventRequestEncoder extends MessageToMessageEncoder<Event<CouchbaseRequest>> {
@Override
protected void encode(ChannelHandlerContext ctx, Event<CouchbaseRequest> msg, List<Object> out) throws Exception {
queue.offer(msg);
out.add(msg.getData());
}
}
@Override
public <R extends CouchbaseResponse> Promise<R> send(final CouchbaseRequest request) {
final Deferred<R,Promise<R>> deferred = Promises.defer(env.reactorEnv());
if (state() == LifecycleState.CONNECTED) {
Event<CouchbaseRequest> event = Event.wrap(request);
event.setReplyTo(deferred);
channel.write(event).addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
if (!future.isSuccess()) {
deferred.accept(new Exception("woops :("));
}
}
});
} else {
deferred.accept(new Exception("Not connected :("));
}
return deferred.compose();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment