Skip to content

Instantly share code, notes, and snippets.

@alexgtn
Last active September 7, 2017 15:06
Show Gist options
  • Save alexgtn/6f171403a778f33771f5307e03a3aabf to your computer and use it in GitHub Desktop.
Save alexgtn/6f171403a778f33771f5307e03a3aabf to your computer and use it in GitHub Desktop.
package io.vertx.mqtt.test.server;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.routing.Route;
import io.vertx.mqtt.routing.Router;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(VertxUnitRunner.class)
public class MqttServerRouterTest {
private static final Logger log = LoggerFactory.getLogger(MqttServerRouterTest.class);
protected static final String MQTT_SERVER_HOST = "localhost";
protected static final int MQTT_SERVER_PORT = 1883;
MqttEndpoint mqttEndpoint;
private Vertx vertx;
@Before
public void before(TestContext context) {
this.vertx = Vertx.vertx();
}
@After
public void after(TestContext context) {
this.vertx.close();
}
@Test
public void messageRouting(TestContext context) {
MqttServer mqttServer = MqttServer.create(
this.vertx,
new MqttServerOptions()
.setHost(MQTT_SERVER_HOST)
.setPort(MQTT_SERVER_PORT)
);
Router router = Router.router(vertx);
mqttServer.endpointHandler(endpoint -> {
endpoint
.publishHandler(router::accept)
.publishReleaseHandler(endpoint::publishComplete);
mqttEndpoint = endpoint;
endpoint.accept(true);
}).listen(ar -> {
if (ar.succeeded()) {
log.info("MQTT server listening on port " + ar.result().actualPort());
latchListen.countDown();
} else {
log.error("Error starting MQTT server", ar.cause());
}
});
Route testRoute = router.route().topic("/foo/+/bar");
testRoute.mqttMessageHandler(message -> {
System.out.println("Got a message on topic: " + message.topicName());
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
mqttEndpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
mqttEndpoint.publishRelease(message.messageId());
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment