Skip to content

Instantly share code, notes, and snippets.

@Bingchean
Created January 23, 2014 07:13
Show Gist options
  • Select an option

  • Save Bingchean/8574281 to your computer and use it in GitHub Desktop.

Select an option

Save Bingchean/8574281 to your computer and use it in GitHub Desktop.
Mqtt 实现android推送
package com.example.TestPushMsgMos;
import android.app.*;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.graphics.Color;
import android.net.ConnectivityManager;
import android.os.Binder;
import android.os.IBinder;
import android.os.PowerManager;
import android.provider.Settings;
import android.util.Log;
import com.ibm.mqtt.*;
import java.lang.ref.WeakReference;
import java.util.Calendar;
import java.util.Date;
import java.util.Enumeration;
import java.util.Hashtable;
/**
* Created by Bingchean on 14-1-23.
*/
public class MQTTService extends Service implements MqttSimpleCallback {
public static final String APP_ID = "com.example.testpushmsgmos";
public static final String MQTT_MSG_RECEIVED_INTENT = "com.example.testpushmsgmos.MSGRECVD";
public static final String MQTT_MSG_RECEIVED_TOPIC = "com.example.testpushmsgmos.MSGRECVD_TOPIC";
public static final String MQTT_MSG_RECEIVED_MSG = "com.example.testpushmsgmos.MSGRECVD_MSGBODY";
public static final String MQTT_STATUS_INTENT = "com.example.testpushmsgmos.STATUS";
public static final String MQTT_STATUS_MSG = "com.example.testpushmsgmos.STATUS_MSG";
public static final String MQTT_PING_ACTION = "com.example.testpushmsgmos.PING";
public static final int MQTT_NOTIFICATION_ONGOING = 1;
public static final int MQTT_NOTIFICATION_UPDATE = 2;
public static final int MAX_MQTT_CLIENTID_LENGTH = 22;
private MQTTConnectionStatus connectionStatus = MQTTConnectionStatus.INITIAL;
private String brokerHostName = "";
private String topicName = "";
private int brokerPortNumber = 1883;
private MqttPersistence usePersistence = null;
private boolean cleanStart = false;
private int[] qualitiesOfService = {0};
private short keepAliveSeconds = 20 * 60;
private String mqttClientId = null;
private IMqttClient mqttClient = null;
private NetworkConnectionIntentReceiver netConnReceiver;
private BackgroundDataChangeIntentReceiver dataEnabledReceiver;
private PingSender pingSender;
private String TAG = "MQTTService.java";
public MQTTService() {}
public enum MQTTConnectionStatus
{
INITIAL,
CONNECTING,
CONNECTED,
NOTCONNECTED_WAITINGFORINTERNET,
NOTCONNECTED_USERDISCONNECT,
NOTCONNECTED_DATADISABLED,
NOTCONNECTED_UNKNOWNREASON
}
@Override
public void onCreate()
{
super.onCreate();
connectionStatus = MQTTConnectionStatus.INITIAL;
mBinder = new LocalBinder<MQTTService>(this);
brokerHostName = "192.168.1.206"; //这里改成你自己的服务器IP地址
topicName = "tokudu"; //这里改成你想要subscribe的topic
Log.i(TAG, "onCreate");
dataEnabledReceiver = new BackgroundDataChangeIntentReceiver();
registerReceiver(dataEnabledReceiver, new IntentFilter(ConnectivityManager.ACTION_BACKGROUND_DATA_SETTING_CHANGED));
defineConnectionToBroker(brokerHostName);
}
public static void actionStart(Context ctx) {
Intent i = new Intent(ctx, MQTTService.class);
ctx.startService(i);
}
@Override
public void onStart(final Intent intent, final int startId)
{
Log.i(TAG, "onStart");
new Thread(new Runnable() {
@Override
public void run() {
handleStart(intent, startId);
}
}, "MQTTservice").start();
}
@Override
public int onStartCommand(final Intent intent, int flags, final int startId)
{
Log.i(TAG, "onStartCommand");
new Thread(new Runnable() {
@Override
public void run() {
handleStart(intent, startId);
}
}, "MQTTservice").start();
return START_STICKY;
}
synchronized void handleStart(Intent intent, int startId)
{
Log.i(TAG, "handleStart");
if (mqttClient == null)
{
Log.i(TAG, "mqttClient === null");
stopSelf();
return;
}
Log.i(TAG, "mqttClient != null");
ConnectivityManager cm = (ConnectivityManager)getSystemService(CONNECTIVITY_SERVICE);
if (cm.getBackgroundDataSetting() == false)
{
connectionStatus = MQTTConnectionStatus.NOTCONNECTED_DATADISABLED;
broadcastServiceStatus("Not connected - background data disabled");
return;
}
Log.i(TAG, "getBackgroundDataSetting == true");
rebroadcastStatus();
rebroadcastReceivedMessages();
Log.i(TAG, "after rebroadcastReceivedMessages");
if (isAlreadyConnected() == false)
{
Log.i(TAG, "isAlreadyConnected() == false");
connectionStatus = MQTTConnectionStatus.CONNECTING;
NotificationManager nm = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);
Notification notification = new Notification(R.drawable.froyo,"MQTT",System.currentTimeMillis());
notification.flags |= Notification.FLAG_ONGOING_EVENT;
notification.flags |= Notification.FLAG_NO_CLEAR;
Intent notificationIntent = new Intent(this, NotifyActivity.class);
PendingIntent contentIntent = PendingIntent.getActivity(this, 0,
notificationIntent,
PendingIntent.FLAG_UPDATE_CURRENT);
notification.setLatestEventInfo(this, "MQTT", "MQTT Service is running", contentIntent);
nm.notify(MQTT_NOTIFICATION_ONGOING, notification);
if (isOnline())
{
if (connectToBroker())
{
subscribeToTopic(topicName);
}
}
else
{
connectionStatus = MQTTConnectionStatus.NOTCONNECTED_WAITINGFORINTERNET;
broadcastServiceStatus("Waiting for network connection");
}
}
if (netConnReceiver == null)
{
netConnReceiver = new NetworkConnectionIntentReceiver();
registerReceiver(netConnReceiver,new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
}
if (pingSender == null)
{
pingSender = new PingSender();
registerReceiver(pingSender, new IntentFilter(MQTT_PING_ACTION));
}
}
@Override
public void onDestroy()
{
super.onDestroy();
disconnectFromBroker();
broadcastServiceStatus("Disconnected");
if (dataEnabledReceiver != null)
{
unregisterReceiver(dataEnabledReceiver);
dataEnabledReceiver = null;
}
if (mBinder != null) {
mBinder.close();
mBinder = null;
}
}
private void broadcastServiceStatus(String statusDescription)
{
Intent broadcastIntent = new Intent();
broadcastIntent.setAction(MQTT_STATUS_INTENT);
broadcastIntent.putExtra(MQTT_STATUS_MSG, statusDescription);
sendBroadcast(broadcastIntent);
}
private void broadcastReceivedMessage(String topic, String message)
{
Intent broadcastIntent = new Intent();
broadcastIntent.setAction(MQTT_MSG_RECEIVED_INTENT);
broadcastIntent.putExtra(MQTT_MSG_RECEIVED_TOPIC, topic);
broadcastIntent.putExtra(MQTT_MSG_RECEIVED_MSG, message);
sendBroadcast(broadcastIntent);
}
private void notifyUser(String alert, String title, String body)
{
NotificationManager nm = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);
Notification notification = new Notification(R.drawable.froyo, alert,System.currentTimeMillis());
notification.defaults |= Notification.DEFAULT_LIGHTS;
notification.defaults |= Notification.DEFAULT_SOUND;
notification.defaults |= Notification.DEFAULT_VIBRATE;
notification.flags |= Notification.FLAG_AUTO_CANCEL;
notification.ledARGB = Color.MAGENTA;
Intent notificationIntent = new Intent(this, NotifyActivity.class);
PendingIntent contentIntent = PendingIntent.getActivity(this, 0,
notificationIntent,
PendingIntent.FLAG_UPDATE_CURRENT);
notification.setLatestEventInfo(this, title, body, contentIntent);
nm.notify(MQTT_NOTIFICATION_UPDATE, notification);
}
private LocalBinder<MQTTService> mBinder;
@Override
public IBinder onBind(Intent intent)
{
return mBinder;
}
public class LocalBinder<S> extends Binder
{
private WeakReference<S> mService;
public LocalBinder(S service)
{
mService = new WeakReference<S>(service);
}
public S getService()
{
return mService.get();
}
public void close()
{
mService = null;
}
}
public MQTTConnectionStatus getConnectionStatus()
{
return connectionStatus;
}
public void rebroadcastStatus()
{
String status = "";
switch (connectionStatus)
{
case INITIAL:
status = "Please wait";
break;
case CONNECTING:
status = "Connecting...";
break;
case CONNECTED:
status = "Connected";
break;
case NOTCONNECTED_UNKNOWNREASON:
status = "Not connected - waiting for network connection";
break;
case NOTCONNECTED_USERDISCONNECT:
status = "Disconnected";
break;
case NOTCONNECTED_DATADISABLED:
status = "Not connected - background data disabled";
break;
case NOTCONNECTED_WAITINGFORINTERNET:
status = "Unable to connect";
break;
}
broadcastServiceStatus(status);
}
public void disconnect()
{
disconnectFromBroker();
connectionStatus = MQTTConnectionStatus.NOTCONNECTED_USERDISCONNECT;
broadcastServiceStatus("Disconnected");
}
public void connectionLost() throws Exception
{
PowerManager pm = (PowerManager) getSystemService(POWER_SERVICE);
PowerManager.WakeLock wl = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MQTT");
wl.acquire();
if (isOnline() == false)
{
connectionStatus = MQTTConnectionStatus.NOTCONNECTED_WAITINGFORINTERNET;
broadcastServiceStatus("Connection lost - no network connection");
notifyUser("Connection lost - no network connection",
"MQTT", "Connection lost - no network connection");
}
else
{
connectionStatus = MQTTConnectionStatus.NOTCONNECTED_UNKNOWNREASON;
broadcastServiceStatus("Connection lost - reconnecting...");
if (connectToBroker()) {
subscribeToTopic(topicName);
}
}
wl.release();
}
public void publishArrived(String topic, byte[] payloadbytes, int qos, boolean retained)
{
PowerManager pm = (PowerManager) getSystemService(POWER_SERVICE);
PowerManager.WakeLock wl = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MQTT");
wl.acquire();
String messageBody = new String(payloadbytes);
if (addReceivedMessageToStore(topic, messageBody))
{
broadcastReceivedMessage(topic, messageBody);
notifyUser("New data received", topic, messageBody);
}
scheduleNextPing();
wl.release();
}
private void defineConnectionToBroker(String brokerHostName)
{
String mqttConnSpec = "tcp://" + brokerHostName + "@" + brokerPortNumber;
Log.d(TAG, mqttConnSpec);
try
{
mqttClient = MqttClient.createMqttClient(mqttConnSpec, usePersistence);
Log.d(TAG, "after createMqttClient:" + mqttConnSpec);
mqttClient.registerSimpleHandler(this);
Log.d(TAG, "after registerSimpleHandler");
}
catch (MqttException e)
{
Log.i(TAG, "MqttException:" + e.getMessage());
mqttClient = null;
connectionStatus = MQTTConnectionStatus.NOTCONNECTED_UNKNOWNREASON;
broadcastServiceStatus("Invalid connection parameters");
notifyUser("Unable to connect", "MQTT", "Unable to connect");
}
}
private boolean connectToBroker()
{
Log.w(TAG, "try to connect broker:clientid:" + generateClientId());
try
{
mqttClient.connect(generateClientId(), cleanStart, keepAliveSeconds);
Log.w(TAG, "after connect");
broadcastServiceStatus("Connected");
connectionStatus = MQTTConnectionStatus.CONNECTED;
scheduleNextPing();
return true;
}
catch (MqttException e)
{
Log.w(TAG, "connect exception:" + e.getMessage());
connectionStatus = MQTTConnectionStatus.NOTCONNECTED_UNKNOWNREASON;
broadcastServiceStatus("Unable to connect");
notifyUser("Unable to connect", "MQTT", "Unable to connect - will retry later");
scheduleNextPing();
return false;
}
}
private void subscribeToTopic(String topicName)
{
boolean subscribed = false;
if (isAlreadyConnected() == false)
{
Log.e("mqtt", "Unable to subscribe as we are not connected");
}
else
{
try
{
String[] topics = { topicName };
mqttClient.subscribe(topics, qualitiesOfService);
subscribed = true;
}
catch (MqttNotConnectedException e)
{
Log.e("mqtt", "subscribe failed - MQTT not connected", e);
}
catch (IllegalArgumentException e)
{
Log.e("mqtt", "subscribe failed - illegal argument", e);
}
catch (MqttException e)
{
Log.e("mqtt", "subscribe failed - MQTT exception", e);
}
}
if (subscribed == false)
{
broadcastServiceStatus("Unable to subscribe");
notifyUser("Unable to subscribe", "MQTT", "Unable to subscribe");
}
}
private void disconnectFromBroker()
{
try
{
if (netConnReceiver != null)
{
unregisterReceiver(netConnReceiver);
netConnReceiver = null;
}
if (pingSender != null)
{
unregisterReceiver(pingSender);
pingSender = null;
}
}
catch (Exception eee)
{
Log.e("mqtt", "unregister failed", eee);
}
try
{
if (mqttClient != null)
{
mqttClient.disconnect();
}
}
catch (MqttPersistenceException e)
{
Log.e("mqtt", "disconnect failed - persistence exception", e);
}
finally
{
mqttClient = null;
}
NotificationManager nm = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);
nm.cancelAll();
}
private boolean isAlreadyConnected()
{
return ((mqttClient != null) && (mqttClient.isConnected() == true));
}
private class BackgroundDataChangeIntentReceiver extends BroadcastReceiver
{
@Override
public void onReceive(Context ctx, Intent intent)
{
PowerManager pm = (PowerManager) getSystemService(POWER_SERVICE);
PowerManager.WakeLock wl = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MQTT");
wl.acquire();
ConnectivityManager cm = (ConnectivityManager)getSystemService(CONNECTIVITY_SERVICE);
if (cm.getBackgroundDataSetting())
{
defineConnectionToBroker(brokerHostName);
handleStart(intent, 0);
}
else
{
connectionStatus = MQTTConnectionStatus.NOTCONNECTED_DATADISABLED;
broadcastServiceStatus("Not connected - background data disabled");
disconnectFromBroker();
}
wl.release();
}
}
private class NetworkConnectionIntentReceiver extends BroadcastReceiver
{
@Override
public void onReceive(Context ctx, Intent intent)
{
PowerManager pm = (PowerManager) getSystemService(POWER_SERVICE);
PowerManager.WakeLock wl = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MQTT");
wl.acquire();
if (isOnline())
{
if (connectToBroker())
{
subscribeToTopic(topicName);
}
}
wl.release();
}
}
private void scheduleNextPing()
{
PendingIntent pendingIntent = PendingIntent.getBroadcast(this, 0,
new Intent(MQTT_PING_ACTION),
PendingIntent.FLAG_UPDATE_CURRENT);
Calendar wakeUpTime = Calendar.getInstance();
wakeUpTime.add(Calendar.SECOND, keepAliveSeconds);
AlarmManager aMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
aMgr.set(AlarmManager.RTC_WAKEUP,
wakeUpTime.getTimeInMillis(),
pendingIntent);
}
public class PingSender extends BroadcastReceiver
{
@Override
public void onReceive(Context context, Intent intent)
{
try
{
mqttClient.ping();
}
catch (MqttException e)
{
Log.e("mqtt", "ping failed - MQTT exception", e);
try {
mqttClient.disconnect();
}
catch (MqttPersistenceException e1) {
Log.e("mqtt", "disconnect failed - persistence exception", e1);
}
if (connectToBroker()) {
subscribeToTopic(topicName);
}
}
scheduleNextPing();
}
}
private Hashtable<String, String> dataCache = new Hashtable<String, String>();
private boolean addReceivedMessageToStore(String key, String value)
{
String previousValue = null;
if (value.length() == 0)
{
previousValue = dataCache.remove(key);
}
else
{
previousValue = dataCache.put(key, value);
}
return ((previousValue == null) || (previousValue.equals(value) == false));
}
public void rebroadcastReceivedMessages()
{
Enumeration<String> e = dataCache.keys();
while(e.hasMoreElements())
{
String nextKey = e.nextElement();
String nextValue = dataCache.get(nextKey);
broadcastReceivedMessage(nextKey, nextValue);
}
}
private String generateClientId()
{
if (mqttClientId == null)
{
String timestamp = "" + (new Date()).getTime();
String android_id = Settings.System.getString(getContentResolver(),Settings.Secure.ANDROID_ID);
mqttClientId = timestamp + android_id;
if (mqttClientId.length() > MAX_MQTT_CLIENTID_LENGTH) {
mqttClientId = mqttClientId.substring(0, MAX_MQTT_CLIENTID_LENGTH);
}
}
return mqttClientId;
}
private boolean isOnline()
{
ConnectivityManager cm = (ConnectivityManager)getSystemService(CONNECTIVITY_SERVICE);
if(cm.getActiveNetworkInfo() != null &&
cm.getActiveNetworkInfo().isAvailable() &&
cm.getActiveNetworkInfo().isConnected())
{
return true;
}
return false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment