Last active
December 8, 2016 20:14
-
-
Save dluc/974eae8bbfac9cb5682c9447e487bce5 to your computer and use it in GitHub Desktop.
Azure C2D messages in the queue processing issues
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// BugQueuedMessagesProcessing.java | |
import com.microsoft.azure.iot.service.sdk.*; | |
import java.nio.charset.StandardCharsets; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.time.Instant; | |
import java.util.Date; | |
import java.util.UUID; | |
public class BugQueuedMessagesProcessing | |
{ | |
static String hub = System.getenv("SDK_HUB_NAME"); | |
static String serviceKey = System.getenv("SDK_SERVICE_KEY"); | |
static String device = System.getenv("SDK_DEVICE_ID"); | |
static String deviceKey = System.getenv("SDK_DEVICE_KEY"); | |
static MessageReceiver receiver; | |
public static void main(String[] args) | |
{ | |
try | |
{ | |
send(); | |
send(); | |
send(); | |
send(); | |
send(); | |
startReceiver(); | |
Thread.sleep(10 * 1000); | |
send(); | |
send(); | |
Thread.sleep(60 * 1000); | |
stopReceiver(); | |
Thread.sleep(5 * 1000); | |
startReceiver(); | |
Thread.sleep(60 * 1000); | |
stopReceiver(); | |
} catch (Exception e) | |
{ | |
log("Sender exception: " + e); | |
System.exit(-1); | |
} | |
log("Done"); | |
} | |
static void send() throws Exception | |
{ | |
Message msg = new Message("test".getBytes(StandardCharsets.UTF_8)); | |
msg.setTo(device); | |
msg.setDeliveryAcknowledgement(DeliveryAcknowledgement.Full); | |
msg.setExpiryTimeUtc(Date.from(Instant.now().plusSeconds(3600))); | |
msg.setMessageId(UUID.randomUUID().toString()); | |
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey; | |
log("Starting ServiceClient"); | |
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS); | |
client.open(); | |
log("Sending message, ID: " + msg.getMessageId() + ", Expiration: " + msg.getExpiryTimeUtc()); | |
client.send(device, msg); | |
log("Stopping ServiceClient"); | |
client.close(); | |
} | |
static void startReceiver() | |
{ | |
receiver = new MessageReceiver(hub, device, deviceKey); | |
receiver.start(); | |
} | |
static void stopReceiver() throws Exception | |
{ | |
receiver.close(); | |
} | |
static void log(String text) | |
{ | |
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); | |
System.out.println("[" + dateFormat.format(new Date()) + "] " + text); | |
} | |
} | |
// MessageReceiver.java | |
import com.microsoft.azure.iothub.DeviceClient; | |
import com.microsoft.azure.iothub.IotHubClientProtocol; | |
import com.microsoft.azure.iothub.IotHubMessageResult; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
public class MessageReceiver extends Thread | |
{ | |
private String hub; | |
private String device; | |
private String deviceKey; | |
private DeviceClient client; | |
public MessageReceiver(String hub, String device, String deviceKey) | |
{ | |
this.hub = hub; | |
this.device = device; | |
this.deviceKey = deviceKey; | |
} | |
public class MessageCallback implements com.microsoft.azure.iothub.MessageCallback | |
{ | |
public IotHubMessageResult execute(com.microsoft.azure.iothub.Message msg, Object context) | |
{ | |
log("Message received: ID: " + msg.getProperty("messageId")); | |
log("Changing the status of the message to COMPLETE"); | |
return IotHubMessageResult.COMPLETE; | |
} | |
} | |
public void run() | |
{ | |
try | |
{ | |
String cs = "HostName=" + this.hub + ".azure-devices.net;DeviceId=" + this.device + ";SharedAccessKey=" + this.deviceKey; | |
log("Starting DeviceClient"); | |
this.client = new DeviceClient(cs, IotHubClientProtocol.AMQPS); | |
this.client.setMessageCallback(new MessageCallback(), null); | |
this.client.open(); | |
} catch (Exception e) | |
{ | |
log("Message receiver exception: " + e); | |
System.exit(-1); | |
} | |
} | |
public void close() throws Exception | |
{ | |
log("Stopping DeviceClient"); | |
this.client.close(); | |
} | |
static void log(String text) | |
{ | |
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); | |
System.out.println("[" + dateFormat.format(new Date()) + "] " + text); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment