Created
June 17, 2019 10:45
-
-
Save dowhiletrue/3bc1137dda8fdc54378082aa56266c6a to your computer and use it in GitHub Desktop.
Request/Response in HiveMQ Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test.mqtt; | |
import com.hivemq.client.mqtt.datatypes.MqttQos; | |
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; | |
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient; | |
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; | |
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; | |
import com.hivemq.client.rx.FlowableWithSingle; | |
import io.reactivex.Flowable; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.Test; | |
import java.net.URI; | |
import java.net.URISyntaxException; | |
import java.util.UUID; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Consumer; | |
import static org.junit.jupiter.api.Assertions.assertTrue; | |
class MqttClientTest { | |
private Mqtt5RxClient client; | |
private CountDownLatch latch; | |
@BeforeEach | |
public void setUp() throws URISyntaxException { | |
client = Mqtt5Client.builder() | |
.identifier(UUID.randomUUID().toString()) | |
.serverHost(new URI("tcp://localhost").getHost()) | |
.serverPort(1883) | |
.automaticReconnectWithDefaultConfig() | |
.buildRx(); | |
client.toBlocking().connect(); | |
latch = new CountDownLatch(2); | |
} | |
@Test | |
public void shouldSendTwoMessagesToStartTopicThenTimeoutWhileWaitingForRequestResponseToComplete() throws InterruptedException, ExecutionException { | |
subscribeToStartTopicWith(requestResponseOnTempTopicCallback()); | |
// start sending the messages | |
produceMessages(); | |
assertTrue(latch.await(20, TimeUnit.SECONDS)); | |
} | |
private void subscribeToStartTopicWith(Consumer<Mqtt5Publish> callback) throws ExecutionException, InterruptedException { | |
// this part is implemented in camel consumer class | |
// subscribes with callback to start topic | |
client.toAsync().subscribeWith().topicFilter("start").callback(callback).send().get(); // wait for subscription to complete | |
} | |
private Consumer<Mqtt5Publish> requestResponseOnTempTopicCallback() { | |
// this is the camel producer (processor) part to send a message to temptopic and wait for the response | |
return c -> { | |
FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribeScenario = client.subscribeStreamWith().topicFilter("tempTopic").applySubscribe(); | |
subscribeScenario | |
// .doOnSingle(subAck -> client.toAsync().publishWith().topic("requestTopic").send()) | |
.firstOrError() | |
.timeout(5, TimeUnit.SECONDS) | |
.subscribe(publish -> { | |
// should not happen, since no publisher for tempTopic for this example implemented | |
}, throwable -> { | |
// handle error | |
System.out.println("Error while waiting on response occurred: " + throwable); | |
latch.countDown(); | |
}); | |
System.out.println("Block thread " + Thread.currentThread().getName() + " until subscribed to tempTopic"); // RxComputationThreadPool-# | |
// wait until tempTopic is subscribed to avoid PUBREC on start topic before message is published on tempTopic | |
waitForSubAck("tempTopic", subscribeScenario); // the test does not work unless this method is commented out | |
publishOnTempTopic(); | |
}; // PUBREC for start topic will be sent here. Want to wait until the message is published on another topic | |
} | |
private void produceMessages() { | |
Flowable<Mqtt5Publish> messagesToPublish = Flowable.range(0, 2) | |
.map(i -> Mqtt5Publish.builder() | |
.topic("start") | |
.qos(MqttQos.EXACTLY_ONCE) | |
.payload(("test " + i).getBytes()) | |
.build()) | |
// Emit 1 message only every 100 milliseconds | |
.zipWith(Flowable.interval(100, TimeUnit.MILLISECONDS), (publish, i) -> publish); | |
client.publish(messagesToPublish) | |
.doOnNext(publishResult -> System.out.println( | |
"Publish acknowledged: " + new String(publishResult.getPublish().getPayloadAsBytes()))) | |
.ignoreElements().subscribe(); | |
} | |
private void waitForSubAck(String topicFilter, FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribeScenario) { | |
CompletableFuture<Mqtt5SubAck> mqtt5SubAckCompletableFuture = subscribeScenario | |
.subscribeSingleFuture(n -> System.out.println("Subscribed to " + n) | |
, e -> System.out.println("Error occurred while subscribing to " + topicFilter + " " + e)); | |
try { | |
// why does this get never complete? | |
Mqtt5SubAck mqtt5SubAck = mqtt5SubAckCompletableFuture.get(); | |
System.out.println("Subscribed to: " + topicFilter + " with " + mqtt5SubAck); | |
} catch (InterruptedException | ExecutionException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private void publishOnTempTopic() { | |
// no need to implement this code to illustrate the problem | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment