Last active
May 2, 2019 01:42
-
-
Save dluc/e24a4eca2faa50e0fc3f66fbf91776d4 to your computer and use it in GitHub Desktop.
Azure IoT SDK message status is set to SUCCESS when in the queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// BugStatusIsCompletedBeforeCompletion.java | |
import com.microsoft.azure.iot.service.sdk.*; | |
import java.nio.charset.StandardCharsets; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.time.Instant; | |
import java.util.Date; | |
import java.util.UUID; | |
public class BugStatusIsCompletedBeforeCompletion | |
{ | |
static String hub = System.getenv("SDK_HUB_NAME"); | |
static String serviceKey = System.getenv("SDK_SERVICE_KEY"); | |
static String device = System.getenv("SDK_DEVICE_ID"); | |
static String deviceKey = System.getenv("SDK_DEVICE_KEY"); | |
static MessageReceiver receiver; | |
public static void main(String[] args) | |
{ | |
String messageId1 = UUID.randomUUID().toString(); | |
String messageId2 = UUID.randomUUID().toString(); | |
try | |
{ | |
// Exercise 1 | |
send(messageId1); | |
checkFeedback(messageId1); | |
// Exercise 2 | |
startReceiver(); | |
send(messageId2); | |
Thread.sleep(5000); | |
checkFeedback(messageId2); | |
stopReceiver(); | |
} | |
catch (Exception e) | |
{ | |
log("Sender exception: " + e); | |
System.exit(-1); | |
} | |
log("Done"); | |
} | |
static void send(String messageId) throws Exception | |
{ | |
Message msg = new Message("test".getBytes(StandardCharsets.UTF_8)); | |
msg.setTo(device); | |
msg.setDeliveryAcknowledgement(DeliveryAcknowledgement.Full); | |
msg.setExpiryTimeUtc(Date.from(Instant.now().plusSeconds(86400))); | |
msg.setMessageId(messageId); | |
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey; | |
log("Creating sender client"); | |
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS); | |
log("Open sender connection"); | |
client.open(); | |
log("Sending message, Content: " + new String(msg.getBytes()) + ", ID: " + msg.getMessageId()); | |
client.send(device, msg); | |
} | |
static void checkFeedback(String messageId) throws Exception | |
{ | |
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey; | |
log("Creating feedback receiver client"); | |
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS); | |
FeedbackReceiver feedbackReceiver = client.getFeedbackReceiver(device); | |
log("Open feedback receiver connection"); | |
feedbackReceiver.open(); | |
log("Checking the status of the message..."); | |
boolean found = false; | |
while (!found) | |
{ | |
log("Retrieving feedback batch..."); | |
FeedbackBatch batch = feedbackReceiver.receive(10000); | |
log("Found feedback batch, size " + batch.getRecords().size()); | |
for (FeedbackRecord record : batch.getRecords()) | |
{ | |
log("Batch record " + record.getOriginalMessageId()); | |
if (record.getOriginalMessageId().equals(messageId)) | |
{ | |
found = true; | |
log("Message " + messageId + " feedback found: description = " + record.getDescription() + "; status code = " + record.getStatusCode()); | |
} | |
} | |
} | |
log("Close feedback receiver connection"); | |
feedbackReceiver.close(); | |
client.close(); | |
} | |
static void startReceiver() | |
{ | |
receiver = new MessageReceiver(hub, device, deviceKey, true); | |
receiver.start(); | |
} | |
static void stopReceiver() throws Exception | |
{ | |
receiver.close(); | |
} | |
static void log(String text) | |
{ | |
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); | |
System.out.println("[" + dateFormat.format(new Date()) + "] " + text); | |
} | |
} | |
// MessageReceiver.java | |
import com.microsoft.azure.iothub.DeviceClient; | |
import com.microsoft.azure.iothub.IotHubClientProtocol; | |
import com.microsoft.azure.iothub.IotHubMessageResult; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
public class MessageReceiver extends Thread | |
{ | |
private final boolean hideBug990; | |
private String hub; | |
private String device; | |
private String deviceKey; | |
private DeviceClient client; | |
public MessageReceiver(String hub, String device, String deviceKey, boolean hideBug990) | |
{ | |
this.hub = hub; | |
this.device = device; | |
this.deviceKey = deviceKey; | |
this.hideBug990 = hideBug990; | |
} | |
public class MessageCallback implements com.microsoft.azure.iothub.MessageCallback | |
{ | |
public IotHubMessageResult execute(com.microsoft.azure.iothub.Message msg, Object hideBug990) | |
{ | |
if ((boolean) hideBug990) | |
log("Message received: Content: " + new String(msg.getBytes()) + ", ID: " + msg.getProperty("messageId")); | |
else | |
log("Message received: Content: " + new String(msg.getBytes()) + ", ID: " + msg.getMessageId() + ", msg prop: " + msg.getProperty("messageId")); | |
log("Changing the status of the message to COMPLETE"); | |
return IotHubMessageResult.COMPLETE; | |
} | |
} | |
public void run() | |
{ | |
try | |
{ | |
String cs = "HostName=" + this.hub + ".azure-devices.net;DeviceId=" + this.device + ";SharedAccessKey=" + this.deviceKey; | |
log("Creating message receiver client"); | |
this.client = new DeviceClient(cs, IotHubClientProtocol.AMQPS); | |
this.client.setMessageCallback(new MessageCallback(), this.hideBug990); | |
log("Opening message receiver connection"); | |
this.client.open(); | |
} | |
catch (Exception e) | |
{ | |
log("Message receiver exception: " + e); | |
System.exit(-1); | |
} | |
} | |
public void close() throws Exception | |
{ | |
log("Closing message receiver"); | |
this.client.close(); | |
} | |
static void log(String text) | |
{ | |
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); | |
System.out.println("[" + dateFormat.format(new Date()) + "] " + text); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment