Skip to content

Instantly share code, notes, and snippets.

@dluc
Last active May 2, 2019 01:42
Show Gist options
  • Save dluc/e24a4eca2faa50e0fc3f66fbf91776d4 to your computer and use it in GitHub Desktop.
Save dluc/e24a4eca2faa50e0fc3f66fbf91776d4 to your computer and use it in GitHub Desktop.
Azure IoT SDK message status is set to SUCCESS when in the queue
// BugStatusIsCompletedBeforeCompletion.java
import com.microsoft.azure.iot.service.sdk.*;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.UUID;
public class BugStatusIsCompletedBeforeCompletion
{
static String hub = System.getenv("SDK_HUB_NAME");
static String serviceKey = System.getenv("SDK_SERVICE_KEY");
static String device = System.getenv("SDK_DEVICE_ID");
static String deviceKey = System.getenv("SDK_DEVICE_KEY");
static MessageReceiver receiver;
public static void main(String[] args)
{
String messageId1 = UUID.randomUUID().toString();
String messageId2 = UUID.randomUUID().toString();
try
{
// Exercise 1
send(messageId1);
checkFeedback(messageId1);
// Exercise 2
startReceiver();
send(messageId2);
Thread.sleep(5000);
checkFeedback(messageId2);
stopReceiver();
}
catch (Exception e)
{
log("Sender exception: " + e);
System.exit(-1);
}
log("Done");
}
static void send(String messageId) throws Exception
{
Message msg = new Message("test".getBytes(StandardCharsets.UTF_8));
msg.setTo(device);
msg.setDeliveryAcknowledgement(DeliveryAcknowledgement.Full);
msg.setExpiryTimeUtc(Date.from(Instant.now().plusSeconds(86400)));
msg.setMessageId(messageId);
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey;
log("Creating sender client");
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS);
log("Open sender connection");
client.open();
log("Sending message, Content: " + new String(msg.getBytes()) + ", ID: " + msg.getMessageId());
client.send(device, msg);
}
static void checkFeedback(String messageId) throws Exception
{
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey;
log("Creating feedback receiver client");
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS);
FeedbackReceiver feedbackReceiver = client.getFeedbackReceiver(device);
log("Open feedback receiver connection");
feedbackReceiver.open();
log("Checking the status of the message...");
boolean found = false;
while (!found)
{
log("Retrieving feedback batch...");
FeedbackBatch batch = feedbackReceiver.receive(10000);
log("Found feedback batch, size " + batch.getRecords().size());
for (FeedbackRecord record : batch.getRecords())
{
log("Batch record " + record.getOriginalMessageId());
if (record.getOriginalMessageId().equals(messageId))
{
found = true;
log("Message " + messageId + " feedback found: description = " + record.getDescription() + "; status code = " + record.getStatusCode());
}
}
}
log("Close feedback receiver connection");
feedbackReceiver.close();
client.close();
}
static void startReceiver()
{
receiver = new MessageReceiver(hub, device, deviceKey, true);
receiver.start();
}
static void stopReceiver() throws Exception
{
receiver.close();
}
static void log(String text)
{
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println("[" + dateFormat.format(new Date()) + "] " + text);
}
}
// MessageReceiver.java
import com.microsoft.azure.iothub.DeviceClient;
import com.microsoft.azure.iothub.IotHubClientProtocol;
import com.microsoft.azure.iothub.IotHubMessageResult;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MessageReceiver extends Thread
{
private final boolean hideBug990;
private String hub;
private String device;
private String deviceKey;
private DeviceClient client;
public MessageReceiver(String hub, String device, String deviceKey, boolean hideBug990)
{
this.hub = hub;
this.device = device;
this.deviceKey = deviceKey;
this.hideBug990 = hideBug990;
}
public class MessageCallback implements com.microsoft.azure.iothub.MessageCallback
{
public IotHubMessageResult execute(com.microsoft.azure.iothub.Message msg, Object hideBug990)
{
if ((boolean) hideBug990)
log("Message received: Content: " + new String(msg.getBytes()) + ", ID: " + msg.getProperty("messageId"));
else
log("Message received: Content: " + new String(msg.getBytes()) + ", ID: " + msg.getMessageId() + ", msg prop: " + msg.getProperty("messageId"));
log("Changing the status of the message to COMPLETE");
return IotHubMessageResult.COMPLETE;
}
}
public void run()
{
try
{
String cs = "HostName=" + this.hub + ".azure-devices.net;DeviceId=" + this.device + ";SharedAccessKey=" + this.deviceKey;
log("Creating message receiver client");
this.client = new DeviceClient(cs, IotHubClientProtocol.AMQPS);
this.client.setMessageCallback(new MessageCallback(), this.hideBug990);
log("Opening message receiver connection");
this.client.open();
}
catch (Exception e)
{
log("Message receiver exception: " + e);
System.exit(-1);
}
}
public void close() throws Exception
{
log("Closing message receiver");
this.client.close();
}
static void log(String text)
{
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println("[" + dateFormat.format(new Date()) + "] " + text);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment