Last active
December 20, 2015 11:48
-
-
Save aschrijver/6125744 to your computer and use it in GitHub Desktop.
See also the discussion at: https://groups.google.com/forum/#!topic/vertx/R6yteLPghM8 In this gist RxMessage has an implicit parameterless constructor. It is instantiated in a generic version of the RxEventBus by using reflection from the message class that was passed in the constructor (and that equals the generic type element).
Tradeoffs are t…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example.vertx.eventbus; | |
import org.vertx.java.core.buffer.Buffer; | |
import io.vertx.rxcore.java.eventbus.RxMessage; | |
public class BsonMessage extends RxMessage<Buffer> { | |
// Do some interesting stuff here as well. | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.vertx.rxcore.java.eventbus; | |
import io.vertx.rxcore.java.impl.VertxObservable; | |
import io.vertx.rxcore.java.impl.VertxSubscription; | |
import org.vertx.java.core.Handler; | |
import org.vertx.java.core.buffer.Buffer; | |
import org.vertx.java.core.eventbus.EventBus; | |
import org.vertx.java.core.eventbus.Message; | |
import org.vertx.java.core.json.JsonArray; | |
import org.vertx.java.core.json.JsonObject; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Subscription; | |
import rx.util.functions.Func1; | |
/* | |
* Copyright 2013 Red Hat, Inc. | |
* | |
* Red Hat licenses this file to you under the Apache License, version 2.0 | |
* (the "License"); you may not use this file except in compliance with the | |
* License. You may obtain a copy of the License at: | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
* License for the specific language governing permissions and limitations | |
* under the License. | |
* | |
* @author <a href="http://tfox.org">Tim Fox</a> | |
*/ | |
public class RxEventBus<M extends RxMessage<T>, T> { | |
private final EventBus eventBus; | |
private final Class<M> messageType; | |
public RxEventBus(EventBus eventBus, Class<M> messageType) { | |
this.eventBus = eventBus; | |
this.messageType = messageType; | |
} | |
M createMessageInstance() { | |
try { | |
return messageType.newInstance(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public Observable<M> send(String address, String msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, JsonObject msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, JsonArray msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, Buffer msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, byte[] msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, Integer msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, Long msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, Float msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, Double msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, Boolean msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, Short msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, Character msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> send(String address, Byte msg) { | |
return doSend(address, msg); | |
} | |
public Observable<M> registerLocalHandler(final String address) { | |
return registerHandler(address, true); | |
} | |
public Observable<M> registerHandler(final String address) { | |
return registerHandler(address, false); | |
} | |
@SuppressWarnings("rawtypes") | |
private Observable<M> doSend(String address, Object msg) { | |
final VertxSubscription<M> sub = new VertxSubscription<>(); | |
Observable<M> obs = new VertxObservable<>(new Func1<Observer<M>, Subscription>() { | |
@Override | |
public Subscription call(Observer<M> replyObserver) { | |
sub.setObserver(replyObserver); | |
return sub; | |
} | |
}); | |
eventBus.send(address, msg, new Handler<Message>() { | |
@SuppressWarnings("unchecked") | |
@Override | |
public void handle(Message reply) { | |
M replyMessage = createMessageInstance(); | |
replyMessage.setCoreMessage(reply); | |
sub.handleResult(replyMessage); | |
sub.complete(); | |
} | |
}); | |
return obs; | |
} | |
private Observable<M> registerHandler(final String address, boolean local) { | |
final VertxSubscription<M> sub = new VertxSubscription<>(); | |
Observable<M> obs = new VertxObservable<>(new Func1<Observer<M>, Subscription>() { | |
@Override | |
public Subscription call(Observer<M> replyObserver) { | |
sub.setObserver(replyObserver); | |
return sub; | |
} | |
}); | |
final Handler<Message<T>> handler = new Handler<Message<T>>() { | |
@Override | |
public void handle(Message<T> reply) { | |
M replyMessage = createMessageInstance(); | |
replyMessage.setCoreMessage(reply); | |
sub.handleResult(replyMessage); | |
} | |
}; | |
if (local) { | |
eventBus.registerLocalHandler(address, handler); | |
} else { | |
eventBus.registerHandler(address, handler); | |
} | |
sub.setOnUnsubscribe(new Runnable() { | |
public void run() { | |
eventBus.unregisterHandler(address, handler); | |
} | |
}); | |
return obs; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.vertx.rxcore.java.eventbus; | |
import io.vertx.rxcore.java.impl.VertxObservable; | |
import io.vertx.rxcore.java.impl.VertxSubscription; | |
import org.vertx.java.core.Handler; | |
import org.vertx.java.core.buffer.Buffer; | |
import org.vertx.java.core.eventbus.Message; | |
import org.vertx.java.core.json.JsonArray; | |
import org.vertx.java.core.json.JsonObject; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Subscription; | |
import rx.util.functions.Func1; | |
/* | |
* Copyright 2013 Red Hat, Inc. | |
* | |
* Red Hat licenses this file to you under the Apache License, version 2.0 | |
* (the "License"); you may not use this file except in compliance with the | |
* License. You may obtain a copy of the License at: | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
* License for the specific language governing permissions and limitations | |
* under the License. | |
* | |
* @author <a href="http://tfox.org">Tim Fox</a> | |
*/ | |
public class RxMessage<T> { | |
// Note that coreMessage is no longer final. | |
private Message<T> coreMessage; | |
void setCoreMessage(Message<T> coreMessage) { | |
this.coreMessage = coreMessage; | |
} | |
/** | |
* The body of the message | |
*/ | |
public T body() { | |
return coreMessage.body(); | |
} | |
/** | |
* The reply address (if any) | |
*/ | |
public String replyAddress() { | |
return coreMessage.replyAddress(); | |
} | |
public Observable<RxMessage<T>> reply(Object message) { | |
final VertxSubscription<RxMessage<T>> sub = new VertxSubscription<>(); | |
Observable<RxMessage<T>> obs = new VertxObservable<>(new Func1<Observer<RxMessage<T>>, Subscription>() { | |
@Override | |
public Subscription call(Observer<RxMessage<T>> replyObserver) { | |
sub.setObserver(replyObserver); | |
return sub; | |
} | |
}); | |
coreMessage.reply(message, new Handler<Message<T>>() { | |
@Override | |
public void handle(Message<T> reply) { | |
RxMessage<T> replyMessage = new RxMessage<>(); | |
replyMessage.setCoreMessage(reply); | |
sub.handleResult(replyMessage); | |
sub.complete(); | |
} | |
}); | |
return obs; | |
} | |
public Observable<RxMessage<T>> reply() { | |
return reply((String) null); | |
} | |
public Observable<RxMessage<T>> reply(JsonObject message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(JsonArray message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(String message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(Buffer message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(byte[] message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(Integer message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(Long message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(Short message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(Character message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(Boolean message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(Float message) { | |
return reply((Object) message); | |
} | |
public Observable<RxMessage<T>> reply(Double message) { | |
return reply((Object) message); | |
} | |
/** | |
* @return The underlying core message | |
*/ | |
public Message<T> getCoreMessage() { | |
return coreMessage; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.vertx.rxcore.test.integration.java; | |
import org.vertx.java.core.buffer.Buffer; | |
import org.vertx.java.core.eventbus.EventBus; | |
import org.example.vertx.eventbus.BsonMessage; | |
import io.vertx.rxcore.java.eventbus.RxEventBus; | |
public class TestClass extends TestVerticle { | |
private EventBus eb; | |
private RxEventBus<BsonMessage, Buffer> rxEventBus; | |
@Override | |
public void start() { | |
eb = vertx.eventBus(); | |
// Example of instantiating EventBus for BsonMessage type. | |
rxEventBus = new RxEventBus<BsonMessage, Buffer>(eb, BsonMessage.class); | |
// Other vertx stuff here... | |
} | |
// More stuff... | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment