-
-
Save cygairko/78422c072a3065a83607 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2009, 2012 IBM Corp. | |
* | |
* All rights reserved. This program and the accompanying materials | |
* are made available under the terms of the Eclipse Public License v1.0 | |
* which accompanies this distribution, and is available at | |
* http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: | |
* Dave Locke - initial API and implementation and/or initial documentation | |
*/ | |
/** | |
* Modified with IntelliJ IDEA by | |
* User: cygairko | |
* Date: 02.05.2014 | |
* Time: 01:00 | |
*/ | |
package de.devcube.distribution.mqtools; | |
import com.eclipsesource.json.JsonObject; | |
import de.devcube.distribution.help.Properties; | |
import de.devcube.distribution.objects.Inbound; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import org.eclipse.paho.client.mqttv3.*; | |
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; | |
import java.util.Arrays; | |
import java.util.concurrent.locks.Lock; | |
public class MqttSubscriber implements Runnable, MqttCallback { | |
private static Logger logger = LogManager.getLogger(new Object() {}.getClass().getEnclosingClass().getSimpleName()); | |
// Private instance variables | |
private static MqttClient client; | |
private static MqttConnectOptions conOpt; | |
private final Lock lock; | |
private final Inbound sharedInbound; | |
private String[] topicList; | |
public MqttSubscriber(Inbound inbound, String[] topicList) { | |
this.sharedInbound = inbound; | |
lock = inbound.getLock(); | |
this.topicList = topicList; | |
// Construct the object that contains connection parameters | |
// such as cleanSession and LWT | |
conOpt = new MqttConnectOptions(); | |
conOpt.setCleanSession(true); | |
// Set password and username if parameters are set in config file | |
String password; | |
String username; | |
if (!(password = Properties.getConfig("MQTT_PASSWORD")).isEmpty()) { | |
conOpt.setPassword(password.toCharArray()); | |
} | |
if (!(username = Properties.getConfig("MQTT_USERNAME")).isEmpty()) { | |
conOpt.setUserName(username); | |
} | |
//This sample stores in a temporary directory... where messages temporarily | |
// stored until the message has been delivered to the server. | |
//..a real application ought to store them somewhere | |
// where they are not likely to get deleted or tampered with | |
String tmpDir = System.getProperty("java.io.tmpdir"); | |
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir); | |
try { | |
client = new MqttClient(Properties.getConfig("MQTT_BROKER_URL"), Properties.getConfig("DEVICE_ID"), dataStore); | |
} catch (MqttException me) { | |
logger.error(me); | |
logger.error("Unable to set up client: " + me.toString()); | |
} | |
client.setCallback(this); | |
} | |
@Override | |
public void run() { | |
try { | |
// Connect to the MQTT server | |
client.connect(conOpt); | |
logger.debug("Connected to \"" + client.getServerURI() + "\" with client ID \"" + client.getClientId() + "\""); | |
// Subscribe to the requested topic | |
// The QoS specified is the maximum level that messages will be sent to the client at. | |
// For instance if QoS 1 is specified, any messages originally published at QoS 2 will | |
// be downgraded to 1 when delivering to the client but messages published at 1 and 0 | |
// will be received at the same level they were published at. | |
logger.debug("Subscribing to topics " + Arrays.toString(topicList)); | |
int[] qos = new int[topicList.length]; | |
Arrays.fill(qos, Integer.valueOf(Properties.getConfig("MQTT_QOS"))); | |
client.subscribe(this.topicList, qos); | |
try { | |
while (!Thread.currentThread().isInterrupted()) { | |
Thread.sleep(500); | |
} | |
} catch (InterruptedException ie) { | |
client.disconnect(); | |
logger.debug("Disconnected; Reason: " + ie); | |
Thread.currentThread().interrupt(); | |
} | |
} catch (MqttException me) { | |
logger.error(me); | |
} | |
} | |
/****************************************************************/ | |
/* Methods to implement the MqttCallback interface */ | |
/****************************************************************/ | |
/** | |
* @see MqttCallback#connectionLost(Throwable) | |
*/ | |
public void connectionLost(Throwable cause) { | |
// Called when the connection to the server has been lost. | |
// An application may choose to implement reconnection | |
// logic at this point. This sample simply exits. | |
logger.debug("Connection lost! " + cause); | |
System.exit(1); | |
} | |
/** | |
* @see MqttCallback#deliveryComplete(IMqttDeliveryToken) | |
*/ | |
public void deliveryComplete(IMqttDeliveryToken token) { | |
// Called when a message has been delivered to the | |
// server. The token passed in here is the same one | |
// that was passed to or returned from the original call to publish. | |
// This allows applications to perform asynchronous | |
// delivery without blocking until delivery completes. | |
// | |
// This sample demonstrates asynchronous deliver and | |
// uses the token.waitForCompletion() call in the main thread which | |
// blocks until the delivery has completed. | |
// Additionally the deliveryComplete method will be called if | |
// the callback is set on the client | |
// | |
// If the connection to the server breaks before delivery has completed | |
// delivery of a message will complete after the client has re-connected. | |
// The getPendingTokens method will provide tokens for any messages | |
// that are still to be delivered. | |
} | |
/** | |
* @see MqttCallback#messageArrived(String, MqttMessage) | |
*/ | |
public void messageArrived(String topic, MqttMessage message) throws MqttException { | |
// Called when a message arrives from the server that matches any | |
// subscription made by the client | |
String incoming = new String(message.getPayload()); | |
logger.debug("new message:\t" + topic + "\t" + incoming); | |
lock.lock(); | |
sharedInbound.setInbound(topic, JsonObject.readFrom(incoming)); | |
lock.unlock(); | |
} | |
/****************************************************************/ | |
/* End of MqttCallback methods */ | |
/****************************************************************/ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment