Skip to content

Instantly share code, notes, and snippets.

@dowhiletrue
Created June 17, 2019 10:45
Show Gist options
  • Save dowhiletrue/3bc1137dda8fdc54378082aa56266c6a to your computer and use it in GitHub Desktop.
Save dowhiletrue/3bc1137dda8fdc54378082aa56266c6a to your computer and use it in GitHub Desktop.
Request/Response in HiveMQ Client
package test.mqtt;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.rx.FlowableWithSingle;
import io.reactivex.Flowable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertTrue;
class MqttClientTest {
private Mqtt5RxClient client;
private CountDownLatch latch;
@BeforeEach
public void setUp() throws URISyntaxException {
client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost(new URI("tcp://localhost").getHost())
.serverPort(1883)
.automaticReconnectWithDefaultConfig()
.buildRx();
client.toBlocking().connect();
latch = new CountDownLatch(2);
}
@Test
public void shouldSendTwoMessagesToStartTopicThenTimeoutWhileWaitingForRequestResponseToComplete() throws InterruptedException, ExecutionException {
subscribeToStartTopicWith(requestResponseOnTempTopicCallback());
// start sending the messages
produceMessages();
assertTrue(latch.await(20, TimeUnit.SECONDS));
}
private void subscribeToStartTopicWith(Consumer<Mqtt5Publish> callback) throws ExecutionException, InterruptedException {
// this part is implemented in camel consumer class
// subscribes with callback to start topic
client.toAsync().subscribeWith().topicFilter("start").callback(callback).send().get(); // wait for subscription to complete
}
private Consumer<Mqtt5Publish> requestResponseOnTempTopicCallback() {
// this is the camel producer (processor) part to send a message to temptopic and wait for the response
return c -> {
FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribeScenario = client.subscribeStreamWith().topicFilter("tempTopic").applySubscribe();
subscribeScenario
// .doOnSingle(subAck -> client.toAsync().publishWith().topic("requestTopic").send())
.firstOrError()
.timeout(5, TimeUnit.SECONDS)
.subscribe(publish -> {
// should not happen, since no publisher for tempTopic for this example implemented
}, throwable -> {
// handle error
System.out.println("Error while waiting on response occurred: " + throwable);
latch.countDown();
});
System.out.println("Block thread " + Thread.currentThread().getName() + " until subscribed to tempTopic"); // RxComputationThreadPool-#
// wait until tempTopic is subscribed to avoid PUBREC on start topic before message is published on tempTopic
waitForSubAck("tempTopic", subscribeScenario); // the test does not work unless this method is commented out
publishOnTempTopic();
}; // PUBREC for start topic will be sent here. Want to wait until the message is published on another topic
}
private void produceMessages() {
Flowable<Mqtt5Publish> messagesToPublish = Flowable.range(0, 2)
.map(i -> Mqtt5Publish.builder()
.topic("start")
.qos(MqttQos.EXACTLY_ONCE)
.payload(("test " + i).getBytes())
.build())
// Emit 1 message only every 100 milliseconds
.zipWith(Flowable.interval(100, TimeUnit.MILLISECONDS), (publish, i) -> publish);
client.publish(messagesToPublish)
.doOnNext(publishResult -> System.out.println(
"Publish acknowledged: " + new String(publishResult.getPublish().getPayloadAsBytes())))
.ignoreElements().subscribe();
}
private void waitForSubAck(String topicFilter, FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribeScenario) {
CompletableFuture<Mqtt5SubAck> mqtt5SubAckCompletableFuture = subscribeScenario
.subscribeSingleFuture(n -> System.out.println("Subscribed to " + n)
, e -> System.out.println("Error occurred while subscribing to " + topicFilter + " " + e));
try {
// why does this get never complete?
Mqtt5SubAck mqtt5SubAck = mqtt5SubAckCompletableFuture.get();
System.out.println("Subscribed to: " + topicFilter + " with " + mqtt5SubAck);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
private void publishOnTempTopic() {
// no need to implement this code to illustrate the problem
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment