Skip to content

Instantly share code, notes, and snippets.

@aschrijver
Last active December 20, 2015 11:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aschrijver/6125744 to your computer and use it in GitHub Desktop.
Save aschrijver/6125744 to your computer and use it in GitHub Desktop.
See also the discussion at: https://groups.google.com/forum/#!topic/vertx/R6yteLPghM8 In this gist RxMessage has an implicit parameterless constructor. It is instantiated in a generic version of the RxEventBus by using reflection from the message class that was passed in the constructor (and that equals the generic type element). Tradeoffs are t…
package org.example.vertx.eventbus;
import org.vertx.java.core.buffer.Buffer;
import io.vertx.rxcore.java.eventbus.RxMessage;
public class BsonMessage extends RxMessage<Buffer> {
// Do some interesting stuff here as well.
}
package io.vertx.rxcore.java.eventbus;
import io.vertx.rxcore.java.impl.VertxObservable;
import io.vertx.rxcore.java.impl.VertxSubscription;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;
/*
* Copyright 2013 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class RxEventBus<M extends RxMessage<T>, T> {
private final EventBus eventBus;
private final Class<M> messageType;
public RxEventBus(EventBus eventBus, Class<M> messageType) {
this.eventBus = eventBus;
this.messageType = messageType;
}
M createMessageInstance() {
try {
return messageType.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Observable<M> send(String address, String msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, JsonObject msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, JsonArray msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, Buffer msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, byte[] msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, Integer msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, Long msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, Float msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, Double msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, Boolean msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, Short msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, Character msg) {
return doSend(address, msg);
}
public Observable<M> send(String address, Byte msg) {
return doSend(address, msg);
}
public Observable<M> registerLocalHandler(final String address) {
return registerHandler(address, true);
}
public Observable<M> registerHandler(final String address) {
return registerHandler(address, false);
}
@SuppressWarnings("rawtypes")
private Observable<M> doSend(String address, Object msg) {
final VertxSubscription<M> sub = new VertxSubscription<>();
Observable<M> obs = new VertxObservable<>(new Func1<Observer<M>, Subscription>() {
@Override
public Subscription call(Observer<M> replyObserver) {
sub.setObserver(replyObserver);
return sub;
}
});
eventBus.send(address, msg, new Handler<Message>() {
@SuppressWarnings("unchecked")
@Override
public void handle(Message reply) {
M replyMessage = createMessageInstance();
replyMessage.setCoreMessage(reply);
sub.handleResult(replyMessage);
sub.complete();
}
});
return obs;
}
private Observable<M> registerHandler(final String address, boolean local) {
final VertxSubscription<M> sub = new VertxSubscription<>();
Observable<M> obs = new VertxObservable<>(new Func1<Observer<M>, Subscription>() {
@Override
public Subscription call(Observer<M> replyObserver) {
sub.setObserver(replyObserver);
return sub;
}
});
final Handler<Message<T>> handler = new Handler<Message<T>>() {
@Override
public void handle(Message<T> reply) {
M replyMessage = createMessageInstance();
replyMessage.setCoreMessage(reply);
sub.handleResult(replyMessage);
}
};
if (local) {
eventBus.registerLocalHandler(address, handler);
} else {
eventBus.registerHandler(address, handler);
}
sub.setOnUnsubscribe(new Runnable() {
public void run() {
eventBus.unregisterHandler(address, handler);
}
});
return obs;
}
}
package io.vertx.rxcore.java.eventbus;
import io.vertx.rxcore.java.impl.VertxObservable;
import io.vertx.rxcore.java.impl.VertxSubscription;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;
/*
* Copyright 2013 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class RxMessage<T> {
// Note that coreMessage is no longer final.
private Message<T> coreMessage;
void setCoreMessage(Message<T> coreMessage) {
this.coreMessage = coreMessage;
}
/**
* The body of the message
*/
public T body() {
return coreMessage.body();
}
/**
* The reply address (if any)
*/
public String replyAddress() {
return coreMessage.replyAddress();
}
public Observable<RxMessage<T>> reply(Object message) {
final VertxSubscription<RxMessage<T>> sub = new VertxSubscription<>();
Observable<RxMessage<T>> obs = new VertxObservable<>(new Func1<Observer<RxMessage<T>>, Subscription>() {
@Override
public Subscription call(Observer<RxMessage<T>> replyObserver) {
sub.setObserver(replyObserver);
return sub;
}
});
coreMessage.reply(message, new Handler<Message<T>>() {
@Override
public void handle(Message<T> reply) {
RxMessage<T> replyMessage = new RxMessage<>();
replyMessage.setCoreMessage(reply);
sub.handleResult(replyMessage);
sub.complete();
}
});
return obs;
}
public Observable<RxMessage<T>> reply() {
return reply((String) null);
}
public Observable<RxMessage<T>> reply(JsonObject message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(JsonArray message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(String message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(Buffer message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(byte[] message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(Integer message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(Long message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(Short message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(Character message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(Boolean message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(Float message) {
return reply((Object) message);
}
public Observable<RxMessage<T>> reply(Double message) {
return reply((Object) message);
}
/**
* @return The underlying core message
*/
public Message<T> getCoreMessage() {
return coreMessage;
}
}
package io.vertx.rxcore.test.integration.java;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.example.vertx.eventbus.BsonMessage;
import io.vertx.rxcore.java.eventbus.RxEventBus;
public class TestClass extends TestVerticle {
private EventBus eb;
private RxEventBus<BsonMessage, Buffer> rxEventBus;
@Override
public void start() {
eb = vertx.eventBus();
// Example of instantiating EventBus for BsonMessage type.
rxEventBus = new RxEventBus<BsonMessage, Buffer>(eb, BsonMessage.class);
// Other vertx stuff here...
}
// More stuff...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment