Skip to content

Instantly share code, notes, and snippets.

@dluc
Last active December 8, 2016 20:14
Show Gist options
  • Save dluc/974eae8bbfac9cb5682c9447e487bce5 to your computer and use it in GitHub Desktop.
Save dluc/974eae8bbfac9cb5682c9447e487bce5 to your computer and use it in GitHub Desktop.
Azure C2D messages in the queue processing issues
// BugQueuedMessagesProcessing.java
import com.microsoft.azure.iot.service.sdk.*;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.UUID;
public class BugQueuedMessagesProcessing
{
static String hub = System.getenv("SDK_HUB_NAME");
static String serviceKey = System.getenv("SDK_SERVICE_KEY");
static String device = System.getenv("SDK_DEVICE_ID");
static String deviceKey = System.getenv("SDK_DEVICE_KEY");
static MessageReceiver receiver;
public static void main(String[] args)
{
try
{
send();
send();
send();
send();
send();
startReceiver();
Thread.sleep(10 * 1000);
send();
send();
Thread.sleep(60 * 1000);
stopReceiver();
Thread.sleep(5 * 1000);
startReceiver();
Thread.sleep(60 * 1000);
stopReceiver();
} catch (Exception e)
{
log("Sender exception: " + e);
System.exit(-1);
}
log("Done");
}
static void send() throws Exception
{
Message msg = new Message("test".getBytes(StandardCharsets.UTF_8));
msg.setTo(device);
msg.setDeliveryAcknowledgement(DeliveryAcknowledgement.Full);
msg.setExpiryTimeUtc(Date.from(Instant.now().plusSeconds(3600)));
msg.setMessageId(UUID.randomUUID().toString());
String cs = "HostName=" + hub + ".azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=" + serviceKey;
log("Starting ServiceClient");
ServiceClient client = ServiceClient.createFromConnectionString(cs, IotHubServiceClientProtocol.AMQPS);
client.open();
log("Sending message, ID: " + msg.getMessageId() + ", Expiration: " + msg.getExpiryTimeUtc());
client.send(device, msg);
log("Stopping ServiceClient");
client.close();
}
static void startReceiver()
{
receiver = new MessageReceiver(hub, device, deviceKey);
receiver.start();
}
static void stopReceiver() throws Exception
{
receiver.close();
}
static void log(String text)
{
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println("[" + dateFormat.format(new Date()) + "] " + text);
}
}
// MessageReceiver.java
import com.microsoft.azure.iothub.DeviceClient;
import com.microsoft.azure.iothub.IotHubClientProtocol;
import com.microsoft.azure.iothub.IotHubMessageResult;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MessageReceiver extends Thread
{
private String hub;
private String device;
private String deviceKey;
private DeviceClient client;
public MessageReceiver(String hub, String device, String deviceKey)
{
this.hub = hub;
this.device = device;
this.deviceKey = deviceKey;
}
public class MessageCallback implements com.microsoft.azure.iothub.MessageCallback
{
public IotHubMessageResult execute(com.microsoft.azure.iothub.Message msg, Object context)
{
log("Message received: ID: " + msg.getProperty("messageId"));
log("Changing the status of the message to COMPLETE");
return IotHubMessageResult.COMPLETE;
}
}
public void run()
{
try
{
String cs = "HostName=" + this.hub + ".azure-devices.net;DeviceId=" + this.device + ";SharedAccessKey=" + this.deviceKey;
log("Starting DeviceClient");
this.client = new DeviceClient(cs, IotHubClientProtocol.AMQPS);
this.client.setMessageCallback(new MessageCallback(), null);
this.client.open();
} catch (Exception e)
{
log("Message receiver exception: " + e);
System.exit(-1);
}
}
public void close() throws Exception
{
log("Stopping DeviceClient");
this.client.close();
}
static void log(String text)
{
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println("[" + dateFormat.format(new Date()) + "] " + text);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment