MQTT实现消息推送

MQTT实现消息接收(接收消息需实现MqttSimpleCallback接口并实现它的publishArrived方法)必须注册接收消息方法:java

mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法
和订阅接主题:
mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题
服务端:

package com.gmcc.kuchuan.business;  
  
import org.apache.commons.logging.Log;  
import org.apache.commons.logging.LogFactory;  
  
import com.ibm.mqtt.MqttClient;  
import com.ibm.mqtt.MqttException;  
import com.ibm.mqtt.MqttSimpleCallback;  
  
/** 
 * MQTT消息发送与接收 
 * @author Join 
 * 
 */  
public class MqttBroker {  
    private final static Log logger = LogFactory.getLog(MqttBroker.class);// 日志对象  
    // 链接参数  
    private final static String CONNECTION_STRING = "tcp://localhost:9901";  
    private final static boolean CLEAN_START = true;  
    private final static short KEEP_ALIVE = 30;// 低耗网络,可是又须要及时获取数据,心跳30s  
    private final static String CLIENT_ID = "master";// 客户端标识  
    private final static int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别  
    private final static String[] TOPICS = { "Test/TestTopics/Topic1",  
            "Test/TestTopics/Topic2", "Test/TestTopics/Topic3",  
            "client/keepalive" };  
    private static MqttBroker instance = new MqttBroker();  
  
    private MqttClient mqttClient;  
  
    /** 
     * 返回实例对象 
     *  
     * @return 
     */  
    public static MqttBroker getInstance() {  
        return instance;  
    }  
  
    /** 
     * 从新链接服务 
     */  
    private void connect() throws MqttException {  
        logger.info("connect to mqtt broker.");  
        mqttClient = new MqttClient(CONNECTION_STRING);  
        logger.info("***********register Simple Handler***********");  
        SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();  
        mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法  
        mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);  
        logger.info("***********subscribe receiver topics***********");  
        mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题  
  
        logger.info("***********CLIENT_ID:" + CLIENT_ID);  
        /** 
         * 完成订阅后,能够增长心跳,保持网络通畅,也能够发布本身的消息 
         */  
        mqttClient.publish("keepalive", "keepalive".getBytes(), QOS_VALUES[0],  
                true);// 增长心跳,保持网络通畅  
    }  
  
    /** 
     * 发送消息 
     *  
     * @param clientId 
     * @param messageId 
     */  
    public void sendMessage(String clientId, String message) {  
        try {  
            if (mqttClient == null || !mqttClient.isConnected()) {  
                connect();  
            }  
  
            logger.info("send message to " + clientId + ", message is "  
                    + message);  
            // 发布本身的消息  
            mqttClient.publish("GMCC/client/" + clientId, message.getBytes(),  
                    0, false);  
        } catch (MqttException e) {  
            logger.error(e.getCause());  
            e.printStackTrace();  
        }  
    }  
  
    /** 
     * 简单回调函数,处理server接收到的主题消息 
     *  
     * @author Join 
     *  
     */  
    class SimpleCallbackHandler implements MqttSimpleCallback {  
  
        /** 
         * 当客户机和broker意外断开时触发 能够再此处理从新订阅 
         */  
        @Override  
        public void connectionLost() throws Exception {  
            // TODO Auto-generated method stub  
            System.out.println("客户机和broker已经断开");  
        }  
  
        /** 
         * 客户端订阅消息后,该方法负责回调接收处理消息 
         */  
        @Override  
        public void publishArrived(String topicName, byte[] payload, int Qos,  
                boolean retained) throws Exception {  
            // TODO Auto-generated method stub  
            System.out.println("订阅主题: " + topicName);  
            System.out.println("消息数据: " + new String(payload));  
            System.out.println("消息级别(0,1,2): " + Qos);  
            System.out.println("是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): "  
                    + retained);  
        }  
  
    }  
  
    public static void main(String[] args) {  
        new MqttBroker().sendMessage("client", "message");  
    }  
}  
Android客户端: 核心代码:MQTTConnection内部类
import java.io.ByteArrayInputStream;  
import java.io.File;  
import java.io.IOException;  
import java.io.InputStream;  
import java.io.RandomAccessFile;  
import java.net.HttpURLConnection;  
import java.net.URL;  
import java.util.ArrayList;  
import java.util.Timer;  
import java.util.TimerTask;  
  
import android.app.AlarmManager;  
import android.app.Notification;  
import android.app.NotificationManager;  
import android.app.PendingIntent;  
import android.app.Service;  
import android.content.BroadcastReceiver;  
import android.content.Context;  
import android.content.Intent;  
import android.content.IntentFilter;  
import android.content.SharedPreferences;  
import android.database.Cursor;  
import android.net.ConnectivityManager;  
import android.net.NetworkInfo;  
import android.os.Binder;  
import android.os.Bundle;  
import android.os.IBinder;  
import android.provider.ContactsContract;  
import android.util.Log;  

import com.ibm.mqtt.MqttClient;  
import com.ibm.mqtt.MqttException;  
import com.ibm.mqtt.MqttPersistence;  
import com.ibm.mqtt.MqttPersistenceException;  
import com.ibm.mqtt.MqttSimpleCallback;  
  
/*  
 * PushService that does all of the work. 
 * Most of the logic is borrowed from KeepAliveService. 
 * http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219 
 */  
public class PushService extends Service {  
    private MyBinder mBinder = new MyBinder();  
    // this is the log tag  
    public static final String TAG = "PushService";  
  
    // the IP address, where your MQTT broker is running.  
    private static final String MQTT_HOST = "120.197.230.53"; // "209.124.50.174";//  
    // the port at which the broker is running.  
    private static int MQTT_BROKER_PORT_NUM = 9901;  
    // Let's not use the MQTT persistence.  
    private static MqttPersistence MQTT_PERSISTENCE = null;  
    // We don't need to remember any state between the connections, so we use a  
    // clean start.  
    private static boolean MQTT_CLEAN_START = true;  
    // Let's set the internal keep alive for MQTT to 15 mins. I haven't tested  
    // this value much. It could probably be increased.  
    private static short MQTT_KEEP_ALIVE = 60 * 15;  
    // Set quality of services to 0 (at most once delivery), since we don't want  
    // push notifications  
    // arrive more than once. However, this means that some messages might get  
    // lost (delivery is not guaranteed)  
    private static int[] MQTT_QUALITIES_OF_SERVICE = { 0 };  
    private static int MQTT_QUALITY_OF_SERVICE = 0;  
    // The broker should not retain any messages.  
    private static boolean MQTT_RETAINED_PUBLISH = false;  
  
    // MQTT client ID, which is given the broker. In this example, I also use  
    // this for the topic header.  
    // You can use this to run push notifications for multiple apps with one  
    // MQTT broker.  
    public static String MQTT_CLIENT_ID = "client";  
  
    // These are the actions for the service (name are descriptive enough)  
    public static final String ACTION_START = MQTT_CLIENT_ID + ".START";  
    private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP";  
    private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID  
            + ".KEEP_ALIVE";  
    private static final String ACTION_RECONNECT = MQTT_CLIENT_ID  
            + ".RECONNECT";  
  
    // Connection log for the push service. Good for debugging.  
    private ConnectionLog mLog;  
  
    // Connectivity manager to determining, when the phone loses connection  
    private ConnectivityManager mConnMan;  
    // Notification manager to displaying arrived push notifications  
    private NotificationManager mNotifMan;  
  
    // Whether or not the service has been started.  
    private boolean mStarted;  
  
    // This the application level keep-alive interval, that is used by the  
    // AlarmManager  
    // to keep the connection active, even when the device goes to sleep.  
    private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 28;  
  
    // Retry intervals, when the connection is lost.  
    private static final long INITIAL_RETRY_INTERVAL = 1000 * 10;  
    private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 30;  
  
    // Preferences instance  
    private SharedPreferences mPrefs;  
    // We store in the preferences, whether or not the service has been started  
    public static final String PREF_STARTED = "isStarted";  
    // We also store the deviceID (target)  
    public static final String PREF_DEVICE_ID = "deviceID";  
    // We store the last retry interval  
    public static final String PREF_RETRY = "retryInterval";  
  
    // Notification title  
    public static String NOTIF_TITLE = "client";  
    // Notification id  
    private static final int NOTIF_CONNECTED = 0;  
  
    // This is the instance of an MQTT connection.  
    private MQTTConnection mConnection;  
    private long mStartTime;  
    boolean mShowFlag = true;// 是否显示通知  
    public static Context ctx;  
    private boolean mRunFlag = true;// 是否向服务器发送心跳  
    Timer mTimer = new Timer();  
  
    // Static method to start the service  
    public static void actionStart(Context ctx) {  
        Intent i = new Intent(ctx, PushService.class);  
        i.setAction(ACTION_START);  
        ctx.startService(i);  
        PushService.ctx = ctx;  
    }  
  
    // Static method to stop the service  
    public static void actionStop(Context ctx) {  
        Intent i = new Intent(ctx, PushService.class);  
        i.setAction(ACTION_STOP);  
        ctx.startService(i);  
    }  
  
    // Static method to send a keep alive message  
    public static void actionPing(Context ctx) {  
        Intent i = new Intent(ctx, PushService.class);  
        i.setAction(ACTION_KEEPALIVE);  
        ctx.startService(i);  
    }  
  
    @Override  
    public void onCreate() {  
        super.onCreate();  
  
        log("Creating service");  
        mStartTime = System.currentTimeMillis();  
  
        try {  
            mLog = new ConnectionLog();  
            Log.i(TAG, "Opened log at " + mLog.getPath());  
        } catch (IOException e) {  
            Log.e(TAG, "Failed to open log", e);  
        }  
  
        // Get instances of preferences, connectivity manager and notification  
        // manager  
        mPrefs = getSharedPreferences(TAG, MODE_PRIVATE);  
        mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);  
        mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);  
  
        /* 
         * If our process was reaped by the system for any reason we need to 
         * restore our state with merely a call to onCreate. We record the last 
         * "started" value and restore it here if necessary. 
         */  
        handleCrashedService();  
    }  
  
    // This method does any necessary clean-up need in case the server has been  
    // destroyed by the system  
    // and then restarted  
    private void handleCrashedService() {  
        if (wasStarted() == true) {  
            log("Handling crashed service...");  
            // stop the keep alives  
            stopKeepAlives();  
  
            // Do a clean start  
            start();  
        }  
    }  
  
    @Override  
    public void onDestroy() {  
        log("Service destroyed (started=" + mStarted + ")");  
  
        // Stop the services, if it has been started  
        if (mStarted == true) {  
            stop();  
        }  
  
        try {  
            if (mLog != null)  
                mLog.close();  
        } catch (IOException e) {  
        }  
    }  
  
    @Override  
    public void onStart(Intent intent, int startId) {  
        super.onStart(intent, startId);  
        log("Service started with intent=" + intent);  
        if (intent == null) {  
            return;  
        }  
        // Do an appropriate action based on the intent.  
        if (intent.getAction().equals(ACTION_STOP) == true) {  
            stop();  
            stopSelf();  
        } else if (intent.getAction().equals(ACTION_START) == true) {  
            start();  
  
        } else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) {  
            keepAlive();  
        } else if (intent.getAction().equals(ACTION_RECONNECT) == true) {  
            if (isNetworkAvailable()) {  
                reconnectIfNecessary();  
            }  
        }  
    }  
  
    public class MyBinder extends Binder {  
        public PushService getService() {  
            return PushService.this;  
        }  
    }  
  
    @Override  
    public IBinder onBind(Intent intent) {  
        return mBinder;  
    }  
  
    // log helper function  
    private void log(String message) {  
        log(message, null);  
    }  
  
    private void log(String message, Throwable e) {  
        if (e != null) {  
            Log.e(TAG, message, e);  
  
        } else {  
            Log.i(TAG, message);  
        }  
  
        if (mLog != null) {  
            try {  
                mLog.println(message);  
            } catch (IOException ex) {  
            }  
        }  
    }  
  
    // Reads whether or not the service has been started from the preferences  
    private boolean wasStarted() {  
        return mPrefs.getBoolean(PREF_STARTED, false);  
    }  
  
    // Sets whether or not the services has been started in the preferences.  
    private void setStarted(boolean started) {  
        mPrefs.edit().putBoolean(PREF_STARTED, started).commit();  
        mStarted = started;  
    }  
  
    private synchronized void start() {  
        log("Starting service...");  
  
        // Do nothing, if the service is already running.  
        if (mStarted == true) {  
            Log.w(TAG, "Attempt to start connection that is already active");  
            return;  
        }  
  
        // Establish an MQTT connection  
  
        connect();  
  
        // 向服务器定时发送心跳,一分钟一次  
        mRunFlag = true;  
        mTimer.schedule(new TimerTask() {  
            @Override  
            public void run() {  
                if (!mRunFlag) {  
                    // this.cancel();  
                    // PushService.this.stopSelf();  
                    return;  
                }  
                System.out.println("run");  
                try {  
                    if (isNetworkAvailable()) {  
                        SharedPreferences pref = getSharedPreferences(  
                                "client", 0);  
                        String MOBILE_NUM = pref.getString("MOBILE_NUM", "");  
                        HttpUtil.post(Constants.KEEPALIVE + "&mobile="  
                                + MOBILE_NUM + "&online_flag=1");  
                    }  
                } catch (Exception e) {  
                    e.printStackTrace();  
                    // TODO: handle exception  
                }  
            }  
        }, 0, 60 * 1000);  
        // Register a connectivity listener  
        registerReceiver(mConnectivityChanged, new IntentFilter(  
                ConnectivityManager.CONNECTIVITY_ACTION));  
    }  
  
    private synchronized void stop() {  
        // Do nothing, if the service is not running.  
        if (mStarted == false) {  
            Log.w(TAG, "Attempt to stop connection not active.");  
            return;  
        }  
  
        // Save stopped state in the preferences  
        setStarted(false);  
  
        // Remove the connectivity receiver  
        unregisterReceiver(mConnectivityChanged);  
        // Any existing reconnect timers should be removed, since we explicitly  
        // stopping the service.  
        cancelReconnect();  
  
        // Destroy the MQTT connection if there is one  
        if (mConnection != null) {  
            mConnection.disconnect();  
            mConnection = null;  
        }  
    }  
  
    //  
    private synchronized void connect() {  
        log("Connecting...");  
        // Thread t = new Thread() {  
        // @Override  
        // public void run() {  
        // fetch the device ID from the preferences.  
        String deviceID = "GMCC/client/"  
                + mPrefs.getString(PREF_DEVICE_ID, null);  
          
        // Create a new connection only if the device id is not NULL  
        try {  
            mConnection = new MQTTConnection(MQTT_HOST, deviceID);  
        } catch (MqttException e) {  
            // Schedule a reconnect, if we failed to connect  
            log("MqttException: "  
                    + (e.getMessage() != null ? e.getMessage() : "NULL"));  
            if (isNetworkAvailable()) {  
                scheduleReconnect(mStartTime);  
            }  
        }  
        setStarted(true);  
        // }  
        // };  
        // t.start();  
        // 向服务器定时发送心跳,一分钟一次  
        mRunFlag = true;  
    }  
  
    private synchronized void keepAlive() {  
        try {  
            // Send a keep alive, if there is a connection.  
            if (mStarted == true && mConnection != null) {  
                mConnection.sendKeepAlive();  
            }  
        } catch (MqttException e) {  
            log("MqttException: "  
                    + (e.getMessage() != null ? e.getMessage() : "NULL"), e);  
  
            mConnection.disconnect();  
            mConnection = null;  
            cancelReconnect();  
        }  
    }  
  
    // Schedule application level keep-alives using the AlarmManager  
    private void startKeepAlives() {  
        Intent i = new Intent();  
        i.setClass(this, PushService.class);  
        i.setAction(ACTION_KEEPALIVE);  
        PendingIntent pi = PendingIntent.getService(this, 0, i, 0);  
        AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);  
        alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,  
                System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,  
                KEEP_ALIVE_INTERVAL, pi);  
    }  
  
    // Remove all scheduled keep alives  
    private void stopKeepAlives() {  
        Intent i = new Intent();  
        i.setClass(this, PushService.class);  
        i.setAction(ACTION_KEEPALIVE);  
        PendingIntent pi = PendingIntent.getService(this, 0, i, 0);  
        AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);  
        alarmMgr.cancel(pi);  
    }  
  
    // We schedule a reconnect based on the starttime of the service  
    public void scheduleReconnect(long startTime) {  
        // the last keep-alive interval  
        long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL);  
  
        // Calculate the elapsed time since the start  
        long now = System.currentTimeMillis();  
        long elapsed = now - startTime;  
  
        // Set an appropriate interval based on the elapsed time since start  
        if (elapsed < interval) {  
            interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL);  
        } else {  
            interval = INITIAL_RETRY_INTERVAL;  
        }  
  
        log("Rescheduling connection in " + interval + "ms.");  
  
        // Save the new internval  
        mPrefs.edit().putLong(PREF_RETRY, interval).commit();  
  
        // Schedule a reconnect using the alarm manager.  
        Intent i = new Intent();  
        i.setClass(this, PushService.class);  
        i.setAction(ACTION_RECONNECT);  
        PendingIntent pi = PendingIntent.getService(this, 0, i, 0);  
        AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);  
        alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi);  
    }  
  
    // Remove the scheduled reconnect  
    public void cancelReconnect() {  
        Intent i = new Intent();  
        i.setClass(PushService.this, PushService.class);  
        i.setAction(ACTION_RECONNECT);  
        PendingIntent pi = PendingIntent.getService(PushService.this, 0, i, 0);  
        AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);  
        alarmMgr.cancel(pi);  
    }  
  
    private synchronized void reconnectIfNecessary() {  
        log("mStarted" + mStarted);  
        log("mConnection" + mConnection);  
        if (mStarted == true && mConnection == null) {  
            log("Reconnecting...");  
            connect();  
        }  
    }  
  
    // This receiver listeners for network changes and updates the MQTT  
    // connection  
    // accordingly  
    private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() {  
        @Override  
        public void onReceive(Context context, final Intent intent) {  
            // Get network info  
            // Thread mReconnect = new Thread(){  
            // public void run() {  
            NetworkInfo info = (NetworkInfo) intent  
                    .getParcelableExtra(ConnectivityManager.EXTRA_NETWORK_INFO);  
            // Is there connectivity?  
            boolean hasConnectivity = (info != null && info.isConnected()) ? true  
                    : false;  
  
            log("Connectivity changed: connected=" + hasConnectivity);  
  
            if (hasConnectivity) {  
                reconnectIfNecessary();  
            } else if (mConnection != null) {  
                // Thread cancelConn = new Thread(){  
                // public void run() {  
                // // if there no connectivity, make sure MQTT connection is  
                // destroyed  
                log("cancelReconnect");  
                mConnection.disconnect();  
                mConnection = null;  
                log("cancelReconnect" + mConnection);  
                cancelReconnect();  
                // }  
                // };  
                // cancelConn.start();  
            }  
            // };  
            //  
            // };  
            // mReconnect.start();  
        }  
    };  
  
    // Display the topbar notification  
    private void showNotification(String text, Request request) {  
  
        Notification n = new Notification();  
        n.flags |= Notification.FLAG_SHOW_LIGHTS;  
        n.flags |= Notification.FLAG_AUTO_CANCEL;  
        n.defaults = Notification.DEFAULT_ALL;  
        n.icon = R.drawable.ico;  
        n.when = System.currentTimeMillis();  
        Intent intent = new Intent();  
        Bundle bundle = new Bundle();  
        bundle.putSerializable("request", request);  
        bundle.putString("currentTab", "1");  
        intent.putExtras(bundle);  
        intent.setClass(this, MainActivity.class);  
        intent.setAction(Intent.ACTION_MAIN);  
        intent.addCategory(Intent.CATEGORY_LAUNCHER);  
        intent.setFlags(Intent.FLAG_ACTIVITY_NEW_TASK  
                | Intent.FLAG_ACTIVITY_RESET_TASK_IF_NEEDED);  
        // Simply open the parent activity  
        PendingIntent pi = PendingIntent.getActivity(this, 0, intent, 0);  
  
        // Change the name of the notification here  
        n.setLatestEventInfo(this, NOTIF_TITLE, text, pi);  
        mNotifMan.notify(NOTIF_CONNECTED, n);  
    }  
  
    // Check if we are online  
    private boolean isNetworkAvailable() {  
        NetworkInfo info = mConnMan.getActiveNetworkInfo();  
        if (info == null) {  
            return false;  
        }  
        return info.isConnected();  
    }  
  
    // This inner class is a wrapper on top of MQTT client.  
    private class MQTTConnection implements MqttSimpleCallback {  
        IMqttClient mqttClient = null;  
  
        // Creates a new connection given the broker address and initial topic  
        public MQTTConnection(String brokerHostName, String initTopic)  
                throws MqttException {  
            // Create connection spec  
            String mqttConnSpec = "tcp://" + brokerHostName + "@"  
                    + MQTT_BROKER_PORT_NUM;  
            // Create the client and connect  
            mqttClient = MqttClient.createMqttClient(mqttConnSpec,  
                    MQTT_PERSISTENCE);  
            String clientID = MQTT_CLIENT_ID + "/"  
                    + mPrefs.getString(PREF_DEVICE_ID, "");  
            Log.d(TAG, "mqttConnSpec:" + mqttConnSpec + "  clientID:"  
                    + clientID);  
            mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);  
  
            // register this client app has being able to receive messages  
            mqttClient.registerSimpleHandler(this);  
  
            // Subscribe to an initial topic, which is combination of client ID  
            // and device ID.  
            // initTopic = MQTT_CLIENT_ID + "/" + initTopic;  
            subscribeToTopic(initTopic);  
  
            log("Connection established to " + brokerHostName + " on topic "  
                    + initTopic);  
  
            // Save start time  
            mStartTime = System.currentTimeMillis();  
            // Star the keep-alives  
            startKeepAlives();  
        }  
  
        // Disconnect  
        public void disconnect() {  
            // try {  
            stopKeepAlives();  
            log("stopKeepAlives");  
            Thread t = new Thread() {  
                public void run() {  
                    try {  
                        mqttClient.disconnect();  
                        log("mqttClient.disconnect();");  
                    } catch (MqttPersistenceException e) {  
                        log("MqttException"  
                                + (e.getMessage() != null ? e.getMessage()  
                                        : " NULL"), e);  
                    }  
                };  
            };  
            t.start();  
            // } catch (MqttPersistenceException e) {  
            // log("MqttException"  
            // + (e.getMessage() != null ? e.getMessage() : " NULL"),  
            // e);  
            // }  
        }  
  
        /* 
         * Send a request to the message broker to be sent messages published 
         * with the specified topic name. Wildcards are allowed. 
         */  
        private void subscribeToTopic(String topicName) throws MqttException {  
  
            if ((mqttClient == null) || (mqttClient.isConnected() == false)) {  
                // quick sanity check - don't try and subscribe if we don't have  
                // a connection  
                log("Connection error" + "No connection");  
            } else {  
                String[] topics = { topicName };  
                mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);  
            }  
        }  
  
        /* 
         * Sends a message to the message broker, requesting that it be 
         * published to the specified topic. 
         */  
        private void publishToTopic(String topicName, String message)  
                throws MqttException {  
            if ((mqttClient == null) || (mqttClient.isConnected() == false)) {  
                // quick sanity check - don't try and publish if we don't have  
                // a connection  
                log("No connection to public to");  
            } else {  
                mqttClient.publish(topicName, message.getBytes(),  
                        MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH);  
            }  
        }  
  
        /* 
         * Called if the application loses it's connection to the message 
         * broker. 
         */  
        public void connectionLost() throws Exception {  
            log("Loss of connection" + "connection downed");  
            stopKeepAlives();  
            // 取消定时发送心跳  
            mRunFlag = false;  
            // 向服务器发送请求,更改在线状态  
            // SharedPreferences pref = getSharedPreferences("client",0);  
            // String MOBILE_NUM=pref.getString("MOBILE_NUM", "");  
            // HttpUtil.post(Constants.KEEPALIVE + "&mobile="  
            // + MOBILE_NUM+"&online_flag=0");  
            // null itself  
            mConnection = null;  
            if (isNetworkAvailable() == true) {  
                reconnectIfNecessary();  
            }  
        }  
  
        /* 
         * Called when we receive a message from the message broker. 
         */  
        public void publishArrived(String topicName, byte[] payload, int qos,  
                boolean retained) throws MqttException {  
            // Show a notification  
            // synchronized (lock) {  
            String s = new String(payload);  
            Request request = null;  
            try {// 解析服务端推送过来的消息  
                request = XmlPaserTool.getMessage(new ByteArrayInputStream(s  
                        .getBytes()));  
                // request=Constants.request;  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
            final Request mRequest = request;  
            DownloadInfo down = new DownloadInfo(mRequest);  
            down.setDownLoad(down);  
            downloadInfos.add(down);  
            sendUpdateBroast();  
            down.start();  
            showNotification("您有一条新的消息!", mRequest);  
            Log.d(PushService.TAG, s);  
            Log.d(PushService.TAG, mRequest.getMessageId());  
            // 再向服务端推送消息  
            new AdvancedCallbackHandler().sendMessage(MQTT_CLIENT_ID  
                    + "/keepalive", "***********send message**********");  
        }  
  
        public void sendKeepAlive() throws MqttException {  
            log("Sending keep alive");  
            // publish to a keep-alive topic  
            publishToTopic(MQTT_CLIENT_ID + "/keepalive",  
                    mPrefs.getString(PREF_DEVICE_ID, ""));  
        }  
    }  
  
    class AdvancedCallbackHandler {  
        IMqttClient mqttClient = null;  
        public final int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别  
  
        /** 
         * 从新链接服务 
         */  
        private void connect() throws MqttException {  
            String mqttConnSpec = "tcp://" + MQTT_HOST + "@"  
                    + MQTT_BROKER_PORT_NUM;  
            // Create the client and connect  
            mqttClient = MqttClient.createMqttClient(mqttConnSpec,  
                    MQTT_PERSISTENCE);  
            String clientID = MQTT_CLIENT_ID + "/"  
                    + mPrefs.getString(PREF_DEVICE_ID, "");  
            mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);  
            Log.d(TAG, "链接服务器,推送消息");  
            Log.d(TAG, "**mqttConnSpec:" + mqttConnSpec + "  clientID:"  
                    + clientID);  
            Log.d(TAG, MQTT_CLIENT_ID + "/keepalive");  
            // 增长心跳,保持网络通畅  
            mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",  
                    "keepalive".getBytes(), QOS_VALUES[0], true);  
        }  
  
        /** 
         * 发送消息 
         *  
         * @param clientId 
         * @param messageId 
         */  
        public void sendMessage(String clientId, String message) {  
            try {  
                if (mqttClient == null || !mqttClient.isConnected()) {  
                    connect();  
                }  
  
                Log.d(TAG, "send message to " + clientId + ", message is "  
                        + message);  
                // 发布本身的消息  
                // mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",  
                // message.getBytes(), 0, false);  
                mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",  
                        message.getBytes(), 0, false);  
            } catch (MqttException e) {  
                Log.d(TAG, e.getCause() + "");  
                e.printStackTrace();  
            }  
        }  
    }  

    public String getPeople(String phone_number) {  
        String name = "";  
        String[] projection = { ContactsContract.PhoneLookup.DISPLAY_NAME,  
                ContactsContract.CommonDataKinds.Phone.NUMBER };  
        Log.d(TAG, "getPeople ---------");  
        // 将本身添加到 msPeers 中  
        Cursor cursor = this.getContentResolver().query(  
                ContactsContract.CommonDataKinds.Phone.CONTENT_URI,  
                projection, // Which columns to return.  
                ContactsContract.CommonDataKinds.Phone.NUMBER + " = '"  
                        + phone_number + "'", // WHERE clause.  
                null, // WHERE clause value substitution  
                null); // Sort order.  
  
        if (cursor == null) {  
            Log.d(TAG, "getPeople null");  
            return "";  
        }  
        Log.d(TAG, "getPeople cursor.getCount() = " + cursor.getCount());  
        if (cursor.getCount() > 0) {  
            cursor.moveToPosition(0);  
  
            // 取得联系人名字  
            int nameFieldColumnIndex = cursor  
                    .getColumnIndex(ContactsContract.PhoneLookup.DISPLAY_NAME);  
            name = cursor.getString(nameFieldColumnIndex);  
            Log.i("Contacts", "" + name + " .... " + nameFieldColumnIndex); // 这里提示  
                                                                            // force  
                                                                            // close  
            System.out.println("联系人姓名:" + name);  
            return name;  
        }  
        return phone_number;  
    }  
  
    public void sendUpdateBroast() {  
        Intent intent = new Intent();  
        intent.setAction("update");  
        sendBroadcast(intent);  
    }  
  
    public void sendUpdateFinishBroast() {  
        Intent intent = new Intent();  
        intent.setAction("updateFinishList");  
        sendBroadcast(intent);  
    }  
  
    public class DownloadInfo extends Thread {  
        boolean runflag = true;  
        Request mRequest;  
        public float progress;  
        public MessageBean bean = null;  
        DownloadInfo download = null;  
        MessageDetailDao dao = new MessageDetailDao(  
                PushService.this.getApplicationContext());  
  
        public synchronized void stopthread() {  
            runflag = false;  
        }  
  
        public synchronized boolean getrunflag() {  
            return runflag;  
        }  
  
        DownloadInfo(Request mRequest) {  
            this.mRequest = mRequest;  
  
        }  
  
        public void setDownLoad(DownloadInfo download) {  
            this.download = download;  
        }  
  
        @Override  
        public void run() {  
            try {  
  
                File dir = new File(Constants.DOWNLOAD_PATH);  
                if (!dir.exists()) {  
                    dir.mkdirs();  
                }  
                String filePath = Constants.DOWNLOAD_PATH  
                        + mRequest.getMessageId() + "." + mRequest.getExt();  
                bean = new MessageBean();  
                bean.setPath(filePath);  
                bean.setStatus(0);  
                bean.setDate(mRequest.getTimestamp());  
                bean.setLayoutID(R.layout.list_say_he_item);  
                bean.setPhotoID(R.drawable.receive_ico);  
                bean.setMessage_key(mRequest.getMessageId());  
                bean.setPhone_number(mRequest.getReceiver());  
                bean.setAction(1);  
                String name = getPeople(mRequest.getSender());  
                bean.setName(name);  
                bean.setFileType(Integer.parseInt(mRequest.getCommand()));  
                if (mRequest.getCommand().equals(Request.TYPE_MUSIC)) {  
                    bean.setMsgIco(R.drawable.music_temp);  
                    bean.setText(name + "给你发送了音乐");  
                    mRequest.setBody(Base64.encodeToString(bean.getText()  
                            .getBytes(), Base64.DEFAULT));  
                } else if (mRequest.getCommand().equals(Request.TYPE_CARD)) {  
                    bean.setMsgIco(R.drawable.card_temp);  
                    bean.setText(new String(Base64.decode(mRequest.getBody(),  
                            Base64.DEFAULT)));  
                    mRequest.setBody(Base64.encodeToString(bean.getText()  
                            .getBytes(), Base64.DEFAULT));  
                } else if (mRequest.getCommand().equals(Request.TYPE_LBS)) {  
                    bean.setMsgIco(R.drawable.address_temp);  
                    bean.setText(new String(Base64.decode(mRequest.getBody(),  
                            Base64.DEFAULT)));  
                    mRequest.setBody(Base64.encodeToString(bean.getText()  
                            .getBytes(), Base64.DEFAULT));  
                } else if (mRequest.getCommand().equals(Request.TYPE_PHOTO)) {  
                    bean.setText(name + "向你发送了照片");  
                    bean.setMsgIco(-1);  
                } else if (mRequest.getCommand().equals(Request.TYPE_PIC)) {  
                    bean.setText(name + "向你发送了图片");  
                    bean.setMsgIco(-1);  
                } else if (mRequest.getCommand().equals(Request.TYPE_SMS)) {  
                    bean.setFileType(0);  
                }  
  
                if (!mRequest.getCommand().equals(Request.TYPE_CARD)  
                        && !mRequest.getCommand().equals(Request.TYPE_SMS)) {  
                    String path = Constants.FILE_DOWNLOAD_URL  
                            + mRequest.getMessageId();  
                    URL url = new URL(path);  
                    HttpURLConnection hurlconn = (HttpURLConnection) url  
                            .openConnection();// 基于HTTP协议的链接对象  
                    hurlconn.setConnectTimeout(5000);// 请求超时时间 5s  
                    hurlconn.setRequestMethod("GET");// 请求方式  
                    hurlconn.connect();  
                    long fileSize = hurlconn.getContentLength();  
                    InputStream instream = hurlconn.getInputStream();  
                    byte[] buffer = new byte[1024];  
                    int len = 0;  
                    int number = 0;  
                    RandomAccessFile rasf = new RandomAccessFile(filePath,  
                            "rwd");  
                    while ((len = instream.read(buffer)) != -1) {// 开始下载文件  
                        if (getrunflag() && progress < 100) {  
                            rasf.seek(number);  
                            number += len;  
                            rasf.write(buffer, 0, len);  
                            progress = (((float) number) / fileSize) * 100;  
                            // 发送广播,修改进度条进度  
                            sendUpdateBroast();  
                        } else {  
                            this.interrupt();  
                            if (number != fileSize) {// 取消下载,将已经缓存的未下载完成的文件删除  
                                File file = new File(filePath);  
                                if (file.exists())  
                                    file.delete();  
                            }  
                            PushService.downloadInfos.remove(download);  
                            sendUpdateBroast();  
                            return;  
                        }  
                    }  
                    instream.close();  
                    PushService.downloadInfos.remove(download);  
                    sendUpdateBroast();  
                } else {// 收到消息,将信息保存到数据库  
  
                    PushService.downloadInfos.remove(download);  
                    sendUpdateBroast();  
                }  
                // 将文件信息保存到数据库  
                dao.create(bean);  
                sendUpdateFinishBroast();  
  
            } catch (Exception e) {  
                PushService.downloadInfos.remove(download);  
                sendUpdateBroast();  
                e.printStackTrace();  
            }  
        }  
    }  
  
    public static ArrayList<DownloadInfo> downloadInfos = new ArrayList<DownloadInfo>();  
  
    public ArrayList<DownloadInfo> getDownloadInfos() {  
        return PushService.downloadInfos;  
    }  
  
    public void setDownloadInfos(ArrayList<DownloadInfo> downloadInfos) {  
        PushService.downloadInfos = downloadInfos;  
    }  
}