Created
December 6, 2016 05:44
-
-
Save dluc/b5dd54b307b312b74ecb6b25ab4a865d to your computer and use it in GitHub Desktop.
Azure C2D, status of rejected messages [2 files]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// BugStatusIsUnknownAfterReject.java | |
import com.microsoft.azure.iot.service.sdk.*; | |
import java.nio.charset.StandardCharsets; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.time.Instant; | |
import java.util.Date; | |
import java.util.UUID; | |
public class BugStatusIsUnknownAfterReject { | |
static String hub = System.getenv("SDK_HUB_NAME"); | |
static String serviceKey = System.getenv("SDK_SERVICE_KEY"); | |
static String device = System.getenv("SDK_DEVICE_ID"); | |
static String deviceKey = System.getenv("SDK_DEVICE_KEY"); | |
static MessageReceiver receiver; | |
public static void main(String[] args) { | |
String messageId = UUID.randomUUID().toString(); | |
try { | |
startReceiver(); | |
send(messageId); | |
Thread.sleep(5000); | |
checkFeedback(messageId); | |
stopReceiver(); | |
} catch (Exception e) { | |
log("Sender exception: " + e); | |
System.exit(-1); | |
} | |
log("Done"); | |
} | |
static void send(String messageId) throws Exception { | |
Message msg = new Message("test".getBytes(StandardCharsets.UTF_8)); | |
msg.setTo(device); | |
msg.setDeliveryAcknowledgement(DeliveryAcknowledgement.Full); | |
msg.setExpiryTimeUtc(Date.from(Instant.now().plusSeconds(86400))); | |
msg.setMessageId(messageId); | |
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey; | |
log("Creating sender client"); | |
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS); | |
log("Open sender connection"); | |
client.open(); | |
log("Sending message, Content: " + new String(msg.getBytes()) + ", ID: " + msg.getMessageId()); | |
client.send(device, msg); | |
} | |
static void checkFeedback(String messageId) throws Exception { | |
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey; | |
log("Creating feedback receiver client"); | |
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS); | |
FeedbackReceiver feedbackReceiver = client.getFeedbackReceiver(device); | |
log("Open feedback receiver connection"); | |
feedbackReceiver.open(); | |
log("Checking the status of the message..."); | |
boolean found = false; | |
while (!found) { | |
log("Retrieving feedback batch..."); | |
FeedbackBatch batch = feedbackReceiver.receive(1000); | |
log("Found feedback batch, size " + batch.getRecords().size()); | |
for (FeedbackRecord record : batch.getRecords()) { | |
log("Batch record " + record.getOriginalMessageId()); | |
if (record.getOriginalMessageId().equals(messageId)) { | |
found = true; | |
log("Message " + messageId + " feedback found: description = " + record.getDescription() + "; status code = " + record.getStatusCode()); | |
} | |
} | |
} | |
log("Close feedback receiver connection"); | |
feedbackReceiver.close(); | |
client.close(); | |
} | |
static void startReceiver() { | |
receiver = new MessageReceiver(hub, device, deviceKey); | |
receiver.start(); | |
} | |
static void stopReceiver() throws Exception { | |
receiver.close(); | |
} | |
static void log(String text) { | |
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); | |
System.out.println("[" + dateFormat.format(new Date()) + "] " + text); | |
} | |
} | |
// MessageReceiver.java | |
import com.microsoft.azure.iothub.DeviceClient; | |
import com.microsoft.azure.iothub.IotHubClientProtocol; | |
import com.microsoft.azure.iothub.IotHubMessageResult; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
public class MessageReceiver extends Thread { | |
private String hub; | |
private String device; | |
private String deviceKey; | |
private DeviceClient client; | |
public MessageReceiver(String hub, String device, String deviceKey) { | |
this.hub = hub; | |
this.device = device; | |
this.deviceKey = deviceKey; | |
} | |
public class MessageCallback implements com.microsoft.azure.iothub.MessageCallback { | |
public IotHubMessageResult execute(com.microsoft.azure.iothub.Message msg, Object hideBug990) { | |
log("Message received: Content: " + new String(msg.getBytes()) + ", ID: " + msg.getProperty("messageId")); | |
log("Replying with REJECT"); | |
return IotHubMessageResult.REJECT; | |
} | |
} | |
public void run() { | |
try { | |
String cs = "HostName=" + this.hub + ".azure-devices.net;DeviceId=" + this.device + ";SharedAccessKey=" + this.deviceKey; | |
log("Creating message receiver client"); | |
this.client = new DeviceClient(cs, IotHubClientProtocol.AMQPS); | |
this.client.setMessageCallback(new MessageCallback(), null); | |
log("Opening message receiver connection"); | |
this.client.open(); | |
} catch (Exception e) { | |
log("Message receiver exception: " + e); | |
System.exit(-1); | |
} | |
} | |
public void close() throws Exception { | |
log("Closing message receiver"); | |
this.client.close(); | |
} | |
static void log(String text) { | |
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); | |
System.out.println("[" + dateFormat.format(new Date()) + "] " + text); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment