MQTT协议之 Apache Apollo服务

1、说明html

  MQTTIBM开发的一个即时通信协议,有可能成为物联网的重要组成部分。该协议支持全部平台,几乎能够把全部联网物品和外部链接起来,被用来当作传感器和致动器(好比经过Twitter让房屋联网)的通讯协议。java

  Apache Apollo是一个代理服务器,其是在ActiveMQ基础上发展而来的,能够支持STOMP, AMQP, MQTT, Openwire, SSL, and WebSockets 等多种协议。apache

  总结来讲MQTT只是一种消息推送的协议目前(2016/1/13)V3.1版本,而Apache Apollo是更具这种协议而开发的一款服务性的服务程序,被用来进行消息推送。一样的服务还有Mosquitto,介绍地址:http://blog.csdn.net/xukai871105/article/details/39252653windows

2、原理浏览器

    Apache Apollo说白了其实很简单,就是在服务器端建立一个惟一订阅号,发送者能够向这个订阅号中发东西,而后接受者(即订阅了这个订阅号的人)都会收到这个订阅号发出来的消息。以此来完成消息的推送。服务器实际上是一个消息中转站。服务器

3、下载安装Apollo(windows)网络

    1.下载地址:http://activemq.apache.org/apollo/session

      

      

  2.安装eclipse

  apollo中间件实际上是免安装的,咱们只须要下载apache-apollo-1.7.1-windows-distro.zip,而后解压到某个文件夹就能够了。在这里我解压到D:\apache-apollo-1.7.1。解压开的路径以下:tcp

 

    

 3.建立属于本身的apollo域

  下载下来的是官方的apollo,咱们须要本身生成本身的apollo,这样作的好处就是咱们能够根据本身的需求修改一些配置文件,建立过程以下: 

  1 建立一个D:\myApollo文件夹。

  2 进入命令输入模式,进入到刚建立的文件下下:cd D:\myApollo

  3 由于接到的目录关系因此可能有些改变,目录为(解压路径\bin\apollo create myapollo) 例子:D:\apache-apollo-1.7.1\bin\apollo create myapollo

  4 建立成功后,在D:\myApollo会有一个myapollo子文件夹,里面内容以下:

   

  其中 bin为启动目录 etc为配置目录。

  4.启动myapollo

  1 cd D:\myApollo\myapollo\bin  (命名模式下进入到本身生成的apollo下的bin目录)

  2 apollo-broker run  (输入启动命令)

  输入后效果以下:

  

  这里咱们能够看到端口配置信息

 5.访问控制台 

  在浏览器输入http://127.0.0.1:61680/,就是上面黑窗口最后一行,打开以下页面

  

  而后输入默认用户名/密码:(admin/password),用户名密码能够在etc/users.properties中找到,点击登录,而后进入控制页面,能够看到myapollo,固然目前里面是没有链接,没有消息的,由于咱们尚未创建链接,发送消息。

  

  至此,咱们的apollo中间件就能够正常使用了。

  6.修改配置

  在咱们的配置目录下(D:\myApollo\myapollo\etc)

  

  apollo.xml 为网络配置信息,其余的不用管最主要的

    <connector id="tcp" bind="tcp://0.0.0.0:61613" connection_limit="2000"/>   connection_limit链接限制条数2000,就是说超过2000就GG了。可不能够修改等链接到了2000条的时候更改试试。

  groups.properties 用于增长用户

    本来为: admins=admin

    增长test用户: admins=admin|test(中间用|分开)

  users.properties  用于设置用户的帐号密码

    用户名=密码

    本来为: admin=password

    增长test用户:  admin=password

            test=test         (下面新增一列,此处与groups.properties文件对应)

 

 7.链接程序:

  依赖包:mqtt-client-0.4.0.jar

  包下载地址:https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/

  代码示例下载:http://pan.baidu.com/s/1kUmsVrT   (gbk编码)

    在线代码解析:

 

public class MyMqtt{
	/**链接访问者id,不能重复*/
	private String clientId;
	/**默认链接路径,服务器所在ip*/
	private String host="tcp://192.168.1.130:61613";
	/**MQTT访问默认用户名*/
	private String userName = "admin";
	/**MQTT访问默密码*/
	private String passWord = "password";
	/**发送的订阅号集合*/
	private String[] publishTopics={"test"};
	/**订阅者的订阅号集合*/
	private String[] subTopics={"test"};
	/**消息发送时的消息模式集合*/
	private int[] publishQos={2};
	/**消息订阅式的消息模式集合*/
	private int[] subQos={2};
	/**单次MQTT链接*/
	private MqttClient client;
	/**链接时的一些额外设置*/
	private MqttConnectOptions options;
	/**发送消息是的个体topic*/
	private MqttTopic topic;
	/**消息传递hander*/
	private Handler mHandler;
	
	
	public MyMqtt(Context context,final Handler handler) {
		clientId=Tool.getIdentifyNumber(context);
		if(client==null || !client.isConnected()){
			try {
				//设置链接指定的额ip,链接人
				client = new MqttClient(host, clientId, new MemoryPersistence());
				//开始设置链接时的参数
				options = new MqttConnectOptions();
				//设置是否清空session,这里若是设置为false表示服务器会保留客户端的链接记录,这里设置为true表示每次链接到服务器都以新的身份链接
				options.setCleanSession(true);
				//设置链接用户名
				options.setUserName(userName);
				//设置链接密码
				options.setPassword(passWord.toCharArray());
				//设置超时链接
				options.setConnectionTimeout(10);
				//设置心跳间隔
				options.setKeepAliveInterval(20);
				//设置当链接断开时发送的死亡预告,当这句被接受到时 证实本链接断开
//				options.setWill(publishTopic, (clientId+"destroy").getBytes(), 0, true);
				//链接回调函数
				client.setCallback(new MqttCallback() {			
					@Override
					public void messageArrived(String topicName, MqttMessage message) throws Exception {
						// TODO Auto-generated method stub
						Message msg = new Message();
						msg.what=1;
						msg.obj=message.toString();
						handler.sendMessage(msg);
					}
					@Override
					public void deliveryComplete(IMqttDeliveryToken token) {
						// TODO Auto-generated method stub
						
					}
					
					@Override
					public void connectionLost(Throwable cause) {
						// TODO Auto-generated method stub
						
					}
				});
				client.connect(options);
				client.subscribe(subTopics, subQos);
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * 发送消息
	 * @param msg
	 */
	public void sendMessage(String msg){
		if(client!=null && client.isConnected()){
			System.out.println("MQTT发送消息了呀 "+msg);
			try {
				MqttMessage message=new MqttMessage();
				//设置消息传输的类型 0,1,2可选
				message.setQos(2);
				//设置是否在服务器中保存消息体
				message.setRetained(false);
				//设置消息的内容
				message.setPayload(msg.getBytes());
				//循环发送,由于一次只能一个
				for (String publishTopic: publishTopics) {
					topic=client.getTopic(publishTopic);
					MqttDeliveryToken token = topic.publish(message);
					token.waitForCompletion();
				}
			} catch (MqttPersistenceException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (NullPointerException e) {
				// TODO: handle exception
				e.printStackTrace();
			}
		}
	}

  8. API查看地址
  http://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/package-summary.html

相关文章
相关标签/搜索