服务端,采用 Mosquitto 来转发分发消息。java
客户端本身写。服务器
服务端 启动 mosquitto (底下的命令是我本身放到环境变量里面的,经过alias 运行mosquitto) IshallbeThatIshallbe:~ iamthat$ mosquitto_start 1427629906: mosquitto version 1.3.5 (build date 2014-10-27 15:12:32+0000) starting 1427629906: Config loaded from /usr/local/Cellar/mosquitto/1.3.5/etc/mosquitto/mosquitto.conf. IshallbeThatIshallbe:~ iamthat$ 1427629906: Opening ipv4 listen socket on port 1883. 1427629906: Opening ipv6 listen socket on port 1883.
客户端代码以下: 记住导入三个ibm 的jar 包。网络
package mqtthelloworld; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttNotConnectedException; import com.ibm.mqtt.MqttPersistenceException; public class Client1 { private final static String CONNECTION_STRING = "tcp://127.0.0.1:1883"; private final static boolean CLEAN_START = true; private final static short KEEP_ALIVE = 30;//低耗网络,可是又须要及时获取数据,心跳30s private final static String CLIENT_ID = "client1"; private final static String[] SUBSCRIBE_TOPICS = { "gabriel" }; private final static int[] QOS_VALUES = {01}; private static final String[] PUBLISH_TOPICS = {"gabriel"}; private static final String publish_topic="gabriel"; ////////////////// private MqttClient mqttClient; public Client1(){ try { mqttClient = new MqttClient(CONNECTION_STRING); SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler(); mqttClient.registerSimpleHandler(simpleCallbackHandler);//注册接收消息方法 mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE); mqttClient.subscribe(SUBSCRIBE_TOPICS, QOS_VALUES);//订阅接主题 /** * 完成订阅后,能够增长心跳,保持网络通畅,也能够发布本身的消息 */ } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void publishMessage(String message) throws MqttNotConnectedException, MqttPersistenceException, IllegalArgumentException, MqttException{ mqttClient.publish(publish_topic, message.getBytes(), QOS_VALUES[0], true); } public static void main(String[] args) throws MqttNotConnectedException, MqttPersistenceException, IllegalArgumentException, MqttException { Client1 client1=new Client1(); for(int i=0;i<10;i++){ client1.publishMessage("testing Client1 and this is "+i+"th message published"); } } }
package mqtthelloworld; import com.ibm.mqtt.MqttSimpleCallback; public class SimpleCallbackHandler implements MqttSimpleCallback { /** * 当客户机和broker意外断开时触发 * 能够再此处理从新订阅 */ @Override public void connectionLost() throws Exception { // TODO Auto-generated method stub System.out.println("客户机和broker已经断开"); } /** * 客户端订阅消息后,该方法负责回调接收处理消息 */ @Override public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception { // TODO Auto-generated method stub System.out.println("订阅主题: " + topicName); System.out.println("消息数据: " + new String(payload)); System.out.println("消息级别(0,1,2): " + Qos); System.out.println("是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): " + retained); } }
再终端开一个客户端:socket
IshallbeThatIshallbe:bin iamthat$ mosquitto_sub -v -t 'gabriel'
运行结果:tcp
运行Client1 结果以下:ide
终端显示ui
IshallbeThatIshallbe:bin iamthat$ mosquitto_sub -v -t 'gabriel' 1427630077: New connection from ::1 on port 1883. 1427630077: New client connected from ::1 as mosqsub/418-IshallbeTha (c1, k60). 1427630094: New connection from 127.0.0.1 on port 1883. 1427630094: New client connected from 127.0.0.1 as client1 (c1, k30). gabriel testing Client1 and this is 0th message published gabriel testing Client1 and this is 1th message published gabriel testing Client1 and this is 2th message published gabriel testing Client1 and this is 3th message published gabriel testing Client1 and this is 4th message published gabriel testing Client1 and this is 5th message published gabriel testing Client1 and this is 6th message published gabriel testing Client1 and this is 7th message published gabriel testing Client1 and this is 8th message published gabriel testing Client1 and this is 9th message published
控制台显示:this
订阅主题: gabriel 消息数据: testing Client1 and this is 0th message published 消息级别(0,1,2): 1 是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false 订阅主题: gabriel 消息数据: testing Client1 and this is 1th message published 消息级别(0,1,2): 1 是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false 订阅主题: gabriel 消息数据: testing Client1 and this is 2th message published 消息级别(0,1,2): 1 是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false 订阅主题: gabriel
再开一个终端,看看订阅者如何与发布者沟通:spa
IshallbeThatIshallbe:bin iamthat$ mosquitto_pub -t 'gabriel' -m 'connecting to publisher' IshallbeThatIshallbe:bin iamthat$
以前的订阅者终端:代理
IshallbeThatIshallbe:bin iamthat$ mosquitto_sub -v -t 'gabriel' 1427630077: New connection from ::1 on port 1883. 1427630077: New client connected from ::1 as mosqsub/418-IshallbeTha (c1, k60). 1427630094: New connection from 127.0.0.1 on port 1883. 1427630094: New client connected from 127.0.0.1 as client1 (c1, k30). gabriel testing Client1 and this is 0th message published gabriel testing Client1 and this is 1th message published gabriel testing Client1 and this is 2th message published gabriel testing Client1 and this is 3th message published gabriel testing Client1 and this is 4th message published gabriel testing Client1 and this is 5th message published gabriel testing Client1 and this is 6th message published gabriel testing Client1 and this is 7th message published gabriel testing Client1 and this is 8th message published gabriel testing Client1 and this is 9th message published 1427631029: New connection from ::1 on port 1883. 1427631029: New client connected from ::1 as mosqpub/468-IshallbeTha (c1, k60). gabriel connecting to publisher
控制台显示:
订阅主题: gabriel
消息数据: testing Client1 and this is 0th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 1th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 2th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 3th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 4th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 5th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 6th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 7th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 8th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 9th message published
消息级别(0,1,2): 1
是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel 消息数据: connecting to publisher 消息级别(0,1,2): 0 是不是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
能够看出订阅者成功与发布者联系交互。
我的对MQTT的理解是:
就三种角色: 订阅者,服务代理,发布者。
客户端订阅者订阅主题---》 服务代理broker-----> 反馈发布者消息给订阅者(消息的传递经过主题)
发布者建立主题发布消息---》服务端broker ---》反馈给订阅者(消息传递经过主题)
订阅者发送消息---》服务端broker---> 发布者(消息传递经过主题)