eclipse paho包对于ActiveMQ持久化订阅者的设置

在实现基于ActiveMQ的电影推送系统的过程中,因为是Android端的应用程序,而在查阅网上的各种资料发现,Android端直接用原生的MQTT来做推送的比较少,而eclipse paho这个封装好的API似乎比较好用在Android端的推送上,于是就采用这个包来做。推送的大致流程可以查看这个网页:基于paho包的Android demo

将逻辑写在Service可以使程序在后台执行时也收到推送。但是有个问题要处理,就是离线的消息不能丢失,所以要使消息持久化。而ActiveMQ实现了持久化订阅者的操作。持久化订阅者的概念大致如下,也可以去该网址查看:ActiveMQ持久化订阅者

ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent delivery)。可以通过MessageProducer类的setDeliveryMode方法设置传输模式。

持久传输和非持久传输最大的区别是:采用持久传输时,传输的消息会保存到磁盘中(messages are persisted to disk/database),即“存储转发”方式。先把消息存储到磁盘中,然后再将消息“转发”给订阅者。

采用非持久传输时,发送的消息不会存储到磁盘中。
采用持久传输时,当Borker宕机恢复后,消息还在。采用非持久传输,Borker宕机重启后,消息丢失。比如,当生产者将消息投递给Broker后,Broker将该消息存储到磁盘中,在Broker将消息发送给Subscriber之前,Broker宕机了,如果采用持久传输,Broker重启后,从磁盘中读出消息再传递给Subscriber;如果采用非持久传输,这条消息就丢失了。

ActiveMQ默认的传输模式是持久传输。对于我们的项目而言,为了保证消息不丢失,我们项目中也要采用持久传输。

接下来介绍持久订阅者和非持久订阅者。持久订阅者和非持久订阅者针对的是订阅发布模式而不是点对点模式。当Broker发送消息给订阅者时,如果订阅者处于inactive状态:持久订阅者可以收到消息,原理为:持久订阅时,客户端向JMS 服务器注册一个自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。而非持久订阅者则收不到消息。

 持久订阅者/非持久订阅者,只影响离线的时候消息(包括持久消息和非持久消息)是否能接收到,和消息是否持久无关;持久消息/非持久消息,只是影响jmsprovider宕机后。消息是否会丢失,如果永远不会宕机,那么持久消息和非持久消息没有区别。

而原生ActiveMQ持久化订阅者的代码网上也比较多,具体可以看这个网址:ActiveMQ持久化订阅者实现

但基于paho包的实现持久化订阅者的代码没有在网上查阅到。于是自己仔细研究了下,发现是可以实现的。下面我就来详细讲一下。

是否是持久化订阅者和两个地方有关系,首先是连接时连接选项也就是MqttConnectOptions有关,该对象可以设置心跳时间、超时时间等,但和是否是持久化订阅者有关的则是setCleanSession这个成员方法,它接收的参数是boolean,参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录,但此时也不一定是持久化订阅者,这个时候第二个相关的就出场了,就是订阅时的服务质量,也就是qos。服务质量的概念大致如下:

Quality of Service等级是发送与接收端的一种关于保证交付信息的协议。一共有3 QoS等级:

   最多一次(0) 最少一次(1)只一次(2

QoS0 —— 最多1

最小的等级就是 0。并且它保证一次信息尽力交付。一个消息不会被接收端应答,也不会被发送者存储并再发送。这个也被叫做“即发即弃”。并且在TCP协议下也是会有相同的担保。

QoS1 ——最少1

当使用QoS等级1 时,它保证信息将会被至少发送一次给接受者。但是消息也可能被发送两次甚至更多。发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。

最高的QoS就是2,它会确保每个消息都只被接收到的一次,他是最安全也是最慢的服务等级。

当连接完成后,可以进行订阅操作,此时就需要设置服务质量,在已经传参setCleanSession为false的情况下,服务质量设为0,仍然是非持久化订阅者,这是我踩过的坑,当服务质量为1或2时,则完成了持久化订阅。在ActiveMQ主界面上的Subscribers中可以看到在线的持久化订阅者、离线的持久化订阅者以及在线的非持久化订阅者的情况,里面还显示了服务质量的区别,是至少一次(at least once)还是正好一次(exactly once),图片如下:

使用paho包做持久化订阅的流程就是这样,调用连接选项的成员方法setCleanSession,设置为false,然后订阅时参数中的服务质量设置为1或2,即可完成持久化订阅。