MqttAndroidClient暴露了connect()方法用于链接代理服务器:服务器
@Override public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback); ... if (mqttService == null) { // First time - must bind to the service 首次启动服务 Intent serviceStartIntent = new Intent(); serviceStartIntent.setClassName(myContext, SERVICE_NAME); Object service = myContext.startService(serviceStartIntent); if (service == null) {//若是服务启动失败则回调链接失败 IMqttActionListener listener = token.getActionCallback(); if (listener != null) { listener.onFailure(token, new RuntimeException("cannot start service " + SERVICE_NAME)); } } //若是服务启动成功,则绑定service生命周期 // We bind with BIND_SERVICE_FLAG (0), leaving us the manage the lifecycle // until the last time it is stopped by a call to stopService() myContext.bindService(serviceStartIntent, serviceConnection, Context.BIND_AUTO_CREATE); if (!receiverRegistered) registerReceiver(this); } else { pool.execute(new Runnable() { @Override public void run() { doConnect();//链接broker //Register receiver to show shoulder tap. if (!receiverRegistered) registerReceiver(MqttAndroidClient.this); } }); } return token; }
查看方法代码,connect()中会先检查是否已启动MqttService,肯定服务已启动才执行doConnect()。doConnect()中其实也就是调用:session
mqttService.connect(clientHandle, connectOptions, null, activityToken);
再往下看,定位到MqttConnection.class中的connect()方法:app
public void connect(MqttConnectOptions options, String invocationContext, String activityToken) { connectOptions = options; reconnectActivityToken = activityToken; //根据cleanSession清除历史消息 if (options != null) { cleanSession = options.isCleanSession(); } if (connectOptions.isCleanSession()) { // if it's a clean session,discard old data service.messageStore.clearArrivedMessages(clientHandle); } service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}"); final Bundle resultBundle = new Bundle(); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken); resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION); try { if (persistence == null) { // ask Android where we can put files File myDir = service.getExternalFilesDir(TAG); if (myDir == null) { // No external storage, use internal storage instead. myDir = service.getDir(TAG, Context.MODE_PRIVATE); if (myDir == null) { //Shouldn't happen. resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, "Error! No external and internal storage available"); resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, new MqttPersistenceException()); service.callbackToActivity(clientHandle, Status.ERROR, resultBundle); return; } } // use that to setup MQTT client persistence storage persistence = new MqttDefaultFilePersistence(myDir.getAbsolutePath()); } IMqttActionListener listener = new MqttConnectionListener(resultBundle) { @Override public void onSuccess(IMqttToken asyncActionToken) { doAfterConnectSuccess(resultBundle); service.traceDebug(TAG, "connect success!"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, exception.getLocalizedMessage()); resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, exception); service.traceError(TAG, "connect fail, call connect to reconnect.reason:" + exception.getMessage()); doAfterConnectFail(resultBundle); } }; if (myClient != null) {//若是已经建立过MqttAsyncClient,也即以前就调用过本connect()方法 if (isConnecting) {//上次调用的connect()还在链接中,不作处理,等待connect()结果 service.traceDebug(TAG, "myClient != null and the client is connecting. Connect return directly."); service.traceDebug(TAG, "Connect return:isConnecting:" + isConnecting + ".disconnected:" + disconnected); } else if (!disconnected) {//当前已处于长链接,提示链接成功 service.traceDebug(TAG, "myClient != null and the client is connected and notify!"); doAfterConnectSuccess(resultBundle); } else {//以前的链接未成功或者已掉线,从新尝试链接 service.traceDebug(TAG, "myClient != null and the client is not connected"); service.traceDebug(TAG, "Do Real connect!"); setConnectingState(true); myClient.connect(connectOptions, invocationContext, listener); } } else {// if myClient is null, then create a new connection 链接不曾创建或已被销毁,新建链接 alarmPingSender = new AlarmPingSender(service);//用于发送心跳包 myClient = new MqttAsyncClient(serverURI, clientId, persistence, alarmPingSender); myClient.setCallback(this); service.traceDebug(TAG, "Do Real connect!"); setConnectingState(true); myClient.connect(connectOptions, invocationContext, listener); } } catch (Exception e) { service.traceError(TAG, "Exception occurred attempting to connect: " + e.getMessage()); setConnectingState(false); handleException(resultBundle, e); } }
查看以上代码,我在关键行都添加了注释。另外须要注意到其中有两个比较重要的对象resultBundle和persistence,persistence用于将connection信息持久化,而resultBundle我会在后面分析,它最终会被用于发送广播触发咱们connect、publish、subscribe等的回调监听。继续深刻到MqttAsyncClient.connect():async
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException { ... final boolean automaticReconnect = options.isAutomaticReconnect(); comms.setNetworkModules(createNetworkModules(serverURI, options)); comms.setReconnectCallback(new MqttCallbackExtended() { public void messageArrived(String topic, MqttMessage message) throws Exception { } public void deliveryComplete(IMqttDeliveryToken token) { } public void connectComplete(boolean reconnect, String serverURI) { } public void connectionLost(Throwable cause) { if(automaticReconnect){ // Automatic reconnect is set so make sure comms is in resting state comms.setRestingState(true); reconnecting = true; startReconnectCycle(); } } }); // Insert our own callback to iterate through the URIs till the connect succeeds MqttToken userToken = new MqttToken(getClientId()); ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting); userToken.setActionCallback(connectActionListener); userToken.setUserContext(this); // If we are using the MqttCallbackExtended, set it on the connectActionListener if(this.mqttCallback instanceof MqttCallbackExtended){ connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback); } comms.setNetworkModuleIndex(0); connectActionListener.connect(); return userToken; }
代码比较多,我省略了一部分,主要关注comms.setReconnectCallback()中的自动重连逻辑便可,自动重连的实现看下面的attemptReconnect()方法,重连失败会继续重连直到链接成功,不太重连的间隔时间会随着重连次数增长最大到128s:ide
private void attemptReconnect(){ final String methodName = "attemptReconnect"; //@Trace 500=Attempting to reconnect client: {0} try { connect(this.connOpts, this.userContext,new IMqttActionListener() { public void onSuccess(IMqttToken asyncActionToken) { //@Trace 501=Automatic Reconnect Successful: {0} comms.setRestingState(false); stopReconnectCycle(); } public void onFailure(IMqttToken asyncActionToken, Throwable exception) { //@Trace 502=Automatic Reconnect failed, rescheduling: {0} if(reconnectDelay < 128000){//reconnectDelay初始值为1000,每次重连失败时*2 reconnectDelay = reconnectDelay * 2; } rescheduleReconnectCycle(reconnectDelay); } }); } catch (MqttSecurityException ex) { //@TRACE 804=exception } catch (MqttException ex) { //@TRACE 804=exception } }
好了,看完重连逻辑咱们再回到前面的connect()方法,MqttAsyncClient.connect()会进入ClientComms.connect():ui
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException { final String methodName = "connect"; synchronized (conLock) { if (isDisconnected() && !closePending) { ... tokenStore.open(); ConnectBG conbg = new ConnectBG(this, token, connect); conbg.start();//经由ConnectBG而后执行ClientComms.internalSend()方法 }else { ... } } }
经由ConnectBG而后执行ClientComms.internalSend()方法,并最终进入ClientState.send()方法this
public void send(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName = "send"; 。。。 if (message instanceof MqttPublish) { synchronized (queueLock) { if (actualInFlight >= this.maxInflight) { //@TRACE 613= sending {0} msgs at max inflight window log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)}); throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT); } MqttMessage innerMessage = ((MqttPublish) message).getMessage(); //@TRACE 628=pending publish key={0} qos={1} message={2} log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message}); switch(innerMessage.getQos()) { case 2: outboundQoS2.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; case 1: outboundQoS1.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; } tokenStore.saveToken(token, message); pendingMessages.addElement(message); queueLock.notifyAll(); } } else { //@TRACE 615=pending send key={0} message {1} log.fine(CLASS_NAME,methodName,"615", new Object[]{new Integer(message.getMessageId()), message}); if (message instanceof MqttConnect) { synchronized (queueLock) { // Add the connect action at the head of the pending queue ensuring it jumps // ahead of any of other pending actions. tokenStore.saveToken(token, message); pendingFlows.insertElementAt(message,0); queueLock.notifyAll(); } } else { if (message instanceof MqttPingReq) { this.pingCommand = message; } else if (message instanceof MqttPubRel) { outboundQoS2.put(new Integer(message.getMessageId()), message); persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message); } else if (message instanceof MqttPubComp) { persistence.remove(getReceivedPersistenceKey(message)); } synchronized (queueLock) { if ( !(message instanceof MqttAck )) { tokenStore.saveToken(token, message); } pendingFlows.addElement(message); queueLock.notifyAll(); } } } }
如今咱们再回头聊一聊刚才说的resultBundle,取其中一处引用:代理
private void doAfterConnectSuccess(final Bundle resultBundle) { //since the device's cpu can go to sleep, acquire a wakelock and drop it later. acquireWakeLock(); service.callbackToActivity(clientHandle, Status.OK, resultBundle); deliverBacklog(); setConnectingState(false); disconnected = false; releaseWakeLock(); }
链接成功后会调用MqttService.callbackToActivity(),resultBundle就做为其中一个参数被传入,接下来咱们看看这个方法的实现:rest
/** * 全部消息都经此方法发出 * pass data back to the Activity, by building a suitable Intent object and * broadcasting it * * @param clientHandle source of the data * @param status OK or Error * @param dataBundle the data to be passed */ void callbackToActivity(String clientHandle, Status status, Bundle dataBundle) { // Don't call traceDebug, as it will try to callbackToActivity leading // to recursion. Intent callbackIntent = new Intent(MqttServiceConstants.CALLBACK_TO_ACTIVITY); if (clientHandle != null) { callbackIntent.putExtra(MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle); } callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status); if (dataBundle != null) { callbackIntent.putExtras(dataBundle); } LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent); }
callbackToActivity()方法用于发送本地广播,广播中携带resultBundle,其实包括publish、subscribe等行为不论成功失败都会调用此方法,发出一个指示行为类型及状态的本地广播。那么发出的广播又是在哪里被处理的呢?请往下看。MqttAndroidClient类继承自BroadCastReceiver,查看其onReceive()方法:server
@Override public void onReceive(Context context, Intent intent) { Bundle data = intent.getExtras(); String handleFromIntent = data.getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE); if ((handleFromIntent == null) || (!handleFromIntent.equals(clientHandle))) { return; } String action = data.getString(MqttServiceConstants.CALLBACK_ACTION); if (MqttServiceConstants.CONNECT_ACTION.equals(action)) { connectAction(data); } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) { connectExtendedAction(data); } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) { messageArrivedAction(data); } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) { subscribeAction(data); } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) { unSubscribeAction(data); } else if (MqttServiceConstants.SEND_ACTION.equals(action)) { sendAction(data); } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) { messageDeliveredAction(data); } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION.equals(action)) { connectionLostAction(data); } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) { disconnected(data); } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) { traceAction(data); } else { mqttService.traceError(MqttService.TAG, "Callback action doesn't exist."); } }
没错,data.getString(MqttServiceConstants.CALLBACK_ACTION)获取的就是咱们前面存放在resultBundle中的action,而后根据action去回调callback的对应方法,这里的callback就是咱们创建链接时传入MqttAndroidClient的MqttCallback对象,若是须要监听action为MqttServiceConstants.CONNECT_EXTENDED_ACTION的行为,则要求咱们传入的callback必须为MqttCallbackExtended的实现,MqttCallbackExtended是MqttCallback的子类