Skip to content

Instantly share code, notes, and snippets.

@dluc
Created December 6, 2016 05:44
Show Gist options
  • Save dluc/b5dd54b307b312b74ecb6b25ab4a865d to your computer and use it in GitHub Desktop.
Save dluc/b5dd54b307b312b74ecb6b25ab4a865d to your computer and use it in GitHub Desktop.
Azure C2D, status of rejected messages [2 files]
// BugStatusIsUnknownAfterReject.java
import com.microsoft.azure.iot.service.sdk.*;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.UUID;
public class BugStatusIsUnknownAfterReject {
static String hub = System.getenv("SDK_HUB_NAME");
static String serviceKey = System.getenv("SDK_SERVICE_KEY");
static String device = System.getenv("SDK_DEVICE_ID");
static String deviceKey = System.getenv("SDK_DEVICE_KEY");
static MessageReceiver receiver;
public static void main(String[] args) {
String messageId = UUID.randomUUID().toString();
try {
startReceiver();
send(messageId);
Thread.sleep(5000);
checkFeedback(messageId);
stopReceiver();
} catch (Exception e) {
log("Sender exception: " + e);
System.exit(-1);
}
log("Done");
}
static void send(String messageId) throws Exception {
Message msg = new Message("test".getBytes(StandardCharsets.UTF_8));
msg.setTo(device);
msg.setDeliveryAcknowledgement(DeliveryAcknowledgement.Full);
msg.setExpiryTimeUtc(Date.from(Instant.now().plusSeconds(86400)));
msg.setMessageId(messageId);
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey;
log("Creating sender client");
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS);
log("Open sender connection");
client.open();
log("Sending message, Content: " + new String(msg.getBytes()) + ", ID: " + msg.getMessageId());
client.send(device, msg);
}
static void checkFeedback(String messageId) throws Exception {
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey;
log("Creating feedback receiver client");
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS);
FeedbackReceiver feedbackReceiver = client.getFeedbackReceiver(device);
log("Open feedback receiver connection");
feedbackReceiver.open();
log("Checking the status of the message...");
boolean found = false;
while (!found) {
log("Retrieving feedback batch...");
FeedbackBatch batch = feedbackReceiver.receive(1000);
log("Found feedback batch, size " + batch.getRecords().size());
for (FeedbackRecord record : batch.getRecords()) {
log("Batch record " + record.getOriginalMessageId());
if (record.getOriginalMessageId().equals(messageId)) {
found = true;
log("Message " + messageId + " feedback found: description = " + record.getDescription() + "; status code = " + record.getStatusCode());
}
}
}
log("Close feedback receiver connection");
feedbackReceiver.close();
client.close();
}
static void startReceiver() {
receiver = new MessageReceiver(hub, device, deviceKey);
receiver.start();
}
static void stopReceiver() throws Exception {
receiver.close();
}
static void log(String text) {
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println("[" + dateFormat.format(new Date()) + "] " + text);
}
}
// MessageReceiver.java
import com.microsoft.azure.iothub.DeviceClient;
import com.microsoft.azure.iothub.IotHubClientProtocol;
import com.microsoft.azure.iothub.IotHubMessageResult;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MessageReceiver extends Thread {
private String hub;
private String device;
private String deviceKey;
private DeviceClient client;
public MessageReceiver(String hub, String device, String deviceKey) {
this.hub = hub;
this.device = device;
this.deviceKey = deviceKey;
}
public class MessageCallback implements com.microsoft.azure.iothub.MessageCallback {
public IotHubMessageResult execute(com.microsoft.azure.iothub.Message msg, Object hideBug990) {
log("Message received: Content: " + new String(msg.getBytes()) + ", ID: " + msg.getProperty("messageId"));
log("Replying with REJECT");
return IotHubMessageResult.REJECT;
}
}
public void run() {
try {
String cs = "HostName=" + this.hub + ".azure-devices.net;DeviceId=" + this.device + ";SharedAccessKey=" + this.deviceKey;
log("Creating message receiver client");
this.client = new DeviceClient(cs, IotHubClientProtocol.AMQPS);
this.client.setMessageCallback(new MessageCallback(), null);
log("Opening message receiver connection");
this.client.open();
} catch (Exception e) {
log("Message receiver exception: " + e);
System.exit(-1);
}
}
public void close() throws Exception {
log("Closing message receiver");
this.client.close();
}
static void log(String text) {
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println("[" + dateFormat.format(new Date()) + "] " + text);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment