Skip to content

Instantly share code, notes, and snippets.

@cygairko
Created May 8, 2014 17:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cygairko/78422c072a3065a83607 to your computer and use it in GitHub Desktop.
Save cygairko/78422c072a3065a83607 to your computer and use it in GitHub Desktop.
/*
* Copyright (c) 2009, 2012 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
/**
* Modified with IntelliJ IDEA by
* User: cygairko
* Date: 02.05.2014
* Time: 01:00
*/
package de.devcube.distribution.mqtools;
import com.eclipsesource.json.JsonObject;
import de.devcube.distribution.help.Properties;
import de.devcube.distribution.objects.Inbound;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import java.util.Arrays;
import java.util.concurrent.locks.Lock;
public class MqttSubscriber implements Runnable, MqttCallback {
private static Logger logger = LogManager.getLogger(new Object() {}.getClass().getEnclosingClass().getSimpleName());
// Private instance variables
private static MqttClient client;
private static MqttConnectOptions conOpt;
private final Lock lock;
private final Inbound sharedInbound;
private String[] topicList;
public MqttSubscriber(Inbound inbound, String[] topicList) {
this.sharedInbound = inbound;
lock = inbound.getLock();
this.topicList = topicList;
// Construct the object that contains connection parameters
// such as cleanSession and LWT
conOpt = new MqttConnectOptions();
conOpt.setCleanSession(true);
// Set password and username if parameters are set in config file
String password;
String username;
if (!(password = Properties.getConfig("MQTT_PASSWORD")).isEmpty()) {
conOpt.setPassword(password.toCharArray());
}
if (!(username = Properties.getConfig("MQTT_USERNAME")).isEmpty()) {
conOpt.setUserName(username);
}
//This sample stores in a temporary directory... where messages temporarily
// stored until the message has been delivered to the server.
//..a real application ought to store them somewhere
// where they are not likely to get deleted or tampered with
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
try {
client = new MqttClient(Properties.getConfig("MQTT_BROKER_URL"), Properties.getConfig("DEVICE_ID"), dataStore);
} catch (MqttException me) {
logger.error(me);
logger.error("Unable to set up client: " + me.toString());
}
client.setCallback(this);
}
@Override
public void run() {
try {
// Connect to the MQTT server
client.connect(conOpt);
logger.debug("Connected to \"" + client.getServerURI() + "\" with client ID \"" + client.getClientId() + "\"");
// Subscribe to the requested topic
// The QoS specified is the maximum level that messages will be sent to the client at.
// For instance if QoS 1 is specified, any messages originally published at QoS 2 will
// be downgraded to 1 when delivering to the client but messages published at 1 and 0
// will be received at the same level they were published at.
logger.debug("Subscribing to topics " + Arrays.toString(topicList));
int[] qos = new int[topicList.length];
Arrays.fill(qos, Integer.valueOf(Properties.getConfig("MQTT_QOS")));
client.subscribe(this.topicList, qos);
try {
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(500);
}
} catch (InterruptedException ie) {
client.disconnect();
logger.debug("Disconnected; Reason: " + ie);
Thread.currentThread().interrupt();
}
} catch (MqttException me) {
logger.error(me);
}
}
/****************************************************************/
/* Methods to implement the MqttCallback interface */
/****************************************************************/
/**
* @see MqttCallback#connectionLost(Throwable)
*/
public void connectionLost(Throwable cause) {
// Called when the connection to the server has been lost.
// An application may choose to implement reconnection
// logic at this point. This sample simply exits.
logger.debug("Connection lost! " + cause);
System.exit(1);
}
/**
* @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
*/
public void deliveryComplete(IMqttDeliveryToken token) {
// Called when a message has been delivered to the
// server. The token passed in here is the same one
// that was passed to or returned from the original call to publish.
// This allows applications to perform asynchronous
// delivery without blocking until delivery completes.
//
// This sample demonstrates asynchronous deliver and
// uses the token.waitForCompletion() call in the main thread which
// blocks until the delivery has completed.
// Additionally the deliveryComplete method will be called if
// the callback is set on the client
//
// If the connection to the server breaks before delivery has completed
// delivery of a message will complete after the client has re-connected.
// The getPendingTokens method will provide tokens for any messages
// that are still to be delivered.
}
/**
* @see MqttCallback#messageArrived(String, MqttMessage)
*/
public void messageArrived(String topic, MqttMessage message) throws MqttException {
// Called when a message arrives from the server that matches any
// subscription made by the client
String incoming = new String(message.getPayload());
logger.debug("new message:\t" + topic + "\t" + incoming);
lock.lock();
sharedInbound.setInbound(topic, JsonObject.readFrom(incoming));
lock.unlock();
}
/****************************************************************/
/* End of MqttCallback methods */
/****************************************************************/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment