Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example MQTT Messaging in Java
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
public class SimpleMqttClient implements MqttCallback {
MqttClient myClient;
MqttConnectOptions connOpt;
static final String BROKER_URL = "tcp://q.m2m.io:1883";
static final String M2MIO_DOMAIN = "<Insert m2m.io domain here>";
static final String M2MIO_STUFF = "things";
static final String M2MIO_THING = "<Unique device ID>";
static final String M2MIO_USERNAME = "<m2m.io username>";
static final String M2MIO_PASSWORD_MD5 = "<m2m.io password (MD5 sum of password)>";
// the following two flags control whether this example is a publisher, a subscriber or both
static final Boolean subscriber = true;
static final Boolean publisher = true;
/**
*
* connectionLost
* This callback is invoked upon losing the MQTT connection.
*
*/
@Override
public void connectionLost(Throwable t) {
System.out.println("Connection lost!");
// code to reconnect to the broker would go here if desired
}
/**
*
* deliveryComplete
* This callback is invoked when a message published by this client
* is successfully received by the broker.
*
*/
@Override
public void deliveryComplete(MqttDeliveryToken token) {
//System.out.println("Pub complete" + new String(token.getMessage().getPayload()));
}
/**
*
* messageArrived
* This callback is invoked when a message is received on a subscribed topic.
*
*/
@Override
public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
System.out.println("-------------------------------------------------");
System.out.println("| Topic:" + topic.getName());
System.out.println("| Message: " + new String(message.getPayload()));
System.out.println("-------------------------------------------------");
}
/**
*
* MAIN
*
*/
public static void main(String[] args) {
SimpleMqttClient smc = new SimpleMqttClient();
smc.runClient();
}
/**
*
* runClient
* The main functionality of this simple example.
* Create a MQTT client, connect to broker, pub/sub, disconnect.
*
*/
public void runClient() {
// setup MQTT Client
String clientID = M2MIO_THING;
connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
connOpt.setUserName(M2MIO_USERNAME);
connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray());
// Connect to Broker
try {
myClient = new MqttClient(BROKER_URL, clientID);
myClient.setCallback(this);
myClient.connect(connOpt);
} catch (MqttException e) {
e.printStackTrace();
System.exit(-1);
}
System.out.println("Connected to " + BROKER_URL);
// setup topic
// topics on m2m.io are in the form <domain>/<stuff>/<thing>
String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/" + M2MIO_THING;
MqttTopic topic = myClient.getTopic(myTopic);
// subscribe to topic if subscriber
if (subscriber) {
try {
int subQoS = 0;
myClient.subscribe(myTopic, subQoS);
} catch (Exception e) {
e.printStackTrace();
}
}
// publish messages if publisher
if (publisher) {
for (int i=1; i<=10; i++) {
String pubMsg = "{\"pubmsg\":" + i + "}";
int pubQoS = 0;
MqttMessage message = new MqttMessage(pubMsg.getBytes());
message.setQos(pubQoS);
message.setRetained(false);
// Publish the message
System.out.println("Publishing to topic \"" + topic + "\" qos " + pubQoS);
MqttDeliveryToken token = null;
try {
// publish message to broker
token = topic.publish(message);
// Wait until the message has been delivered to the broker
token.waitForCompletion();
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// disconnect
try {
// wait to ensure subscribed messages are delivered
if (subscriber) {
Thread.sleep(5000);
}
myClient.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@nawazish-github

This comment has been minimized.

Copy link

commented Mar 14, 2015

at a high level, what is the mqtt programming model in java. please elaborate for both pub and sub facets.

@kevinpauli

This comment has been minimized.

Copy link

commented Jul 15, 2015

How to keep the subscriber alive indefinitely?

@vjtime

This comment has been minimized.

Copy link

commented Mar 3, 2016

dude! i have implemented this program with several brokers and it does not ask for username password...........and sometimes it gives error as bad username and password.......................i have tried figuring out for 3-4 days but i could not..and there is no expected reference available on internet..............so can you explain me in detail.......if possible..........please.

some references say , to edit mosquitto.config file for mosquitto broker...........but how.......and what about the rest of brokers?
and yes. more, when i connect to some brokers such as HIVEMQ, then they keep on disconnecting.......or exception comes.........i cant figure out what to do? please need help.....

@code0xDA

This comment has been minimized.

Copy link

commented Mar 4, 2016

Hi vjtime,

The concept of "user" is different among different brokers.
For mosquitto, you need 1) in the configuration file, add property use password file, 2) in the password file, add username and password pair(the password is hashed in this file).
For other advanced mqtt broker, you can enable LDAP to provide username/password authentication. but basically, it's the same thing.

I hope it helps.

@jamesmccartney

This comment has been minimized.

Copy link

commented Mar 18, 2016

This is a very helpful example. I did have to change the first parameter type in the deliveryComplete method to IMqttDeliveryToken instead of MqttDeliveryToken and add the IMqttDeliveryToken import at the top to get this example to run against my mosquitto broker...I am using the nightly build of Mqtt so that could be the issue or something else that myself a java newbie overlooked ;)

@malik0786

This comment has been minimized.

Copy link

commented May 4, 2016

Hi,
I am new to java,when running this code its showing error Unable to connect to server (32103).Please help.
Thanks

@PreyeaRegmi

This comment has been minimized.

Copy link

commented Mar 29, 2017

Hello there,
Is "myClient.connect(connOpt);" a blocking call or not ?

@hervino14

This comment has been minimized.

Copy link

commented Dec 19, 2017

Hello,
How to send PUBACK when message arrived ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.