如何实现ActiveMq的Topic的持久订阅

原文地址:http://www.mytju.com/classcode/news_readNews.asp?newsID=486java

 

(1)使用queue,即队列时,每一个消息只有一个消费者,因此,持久化很简单,只要保存到数据库便可

。而后,随便一个消费者取走处理便可。某个消费者关掉一阵子,也无所谓。

(2)使用topic,即订阅时,每一个消息能够有多个消费者,就麻烦一些。

首先,假设消费者都是普通的消费者,
------------------------
<1>activemq启动后,发布消息1,惋惜,如今没有消费者启动着,也就是没有消费者进行了订阅。那么

,这个消息就被抛弃了。

<2>消费者1启动了,链接了activemq,进行了订阅,在等待消息~~

activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。

<3>消费者2也启动了,链接了activemq,进行了订阅,在等待消息~~

activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。

<4>消费者1关掉了。

activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。

<5>消费者1又启动了。

activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
-----------------------------
总结一下:
activemq只是向当前启动的消费者发送消息。
关掉的消费者,会错过不少消息,并没有法再次接收这些消息。

若是发送的消息是重要的用户同步数据,错过了,用户数据就不一样步了。

那么,如何让消费者从新启动时,接收到错过的消息呢?

答案是持久订阅。

(3)普通的订阅,不区分消费者,场地里有几我的头,就扔几个馒头。
持久订阅,就要记录消费者的名字了。
张三说,我是张三,有馒头给我留着,我回来拿。
李四说,我是李四,有馒头给我留着,我回来拿。
activemq就记下张三,李四两个名字。

那么,分馒头时,仍是一我的头给一个馒头。
分完了,一看张三没说话,说明他不在,给他留一个。
李四说话了,那就不用留了。

张三回来了,找activemq,一看,这不张三吧,快把他的馒头拿来。
多是一个馒头,也多是100个馒头,就看张三离开这阵子,分了多少次馒头了。

activemq区分消费者,是经过clientID和订户名称来区分的。数据库

 


// 建立connection
connection = connectionFactory.createConnection();
connection.setClientID("bbb"); //持久订阅须要设置这个。
connection.start();

// 建立session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

// 建立destination
Topic topic = session.createTopic("userSyncTopic"); //Topic名称

//MessageConsumer consumer = session.createConsumer(topic); //普通订阅
MessageConsumer consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅

 



(4)还有一点,消息的生产者,发送消息时用使用持久模式
MessageProducer producer = ...;
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
不设置,默认就是持久的

(5)使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个链接到activemq,第二个链接的会报错。

(6)activemq的设置在conf/activemq.xml中,默认消息是保存在data/kahadb中,重启activemq消息不会丢。

能够访问http://localhost:8161/admin/index.jsp
查看当前的队列、Topic和持久订户的信息、发送消息等等,很方便。

能够复制activemq-jdbc.xml中的内容过来,修改一下,就能够把消息保存在其它数据库中了。session

 

-------------------------------------------这是一条分割线-----------------------------------jsp

关于ActiveMQ的持久化:spa

一、消息模型code

ActiveMQ的有两种消息模型:一是点对点,二是发布/订阅模式。xml

点对点在ActiveMQ中的具体实现就是Queue,发布/订阅则是Topic。blog

二、ActiveMQ的持久化队列

ActiveMQ持久化就是在ActiveMQ崩溃时修复后,原消息数据仍未丢失,具体的实现是ActiveMQ使用文件系统或者数据库对消息进行存储,这样在AMQ恢复后能够根据存储的消息进行消息数据的恢复。get

三、Queue

点对点消息,一方发送,一方接收。已经持久化的消息,一旦接收方已经成功的消费掉,则从持久化介质中去掉。

四、Topic

发布/订阅方式,已经持久化的消息,只有订阅者成功消费后才被清除。其中,该方式的持久化,一方面包括订阅者的持久化,是指订阅者因为某些缘由断开链接,再从新链接以后,可以获取到链接断开期间的消息,即不错过消息;另外一方面,是指ActiveMQ发生异常恢复后,发布方的消息仍旧存在。

-------------------------------------------这又是一条分割线--------------------------------

持久化方式:

一、JMS Queue消息的持久化

  Queue的持久化实现比较简单,只须要在发送消息的时候指定为持久化消息便可:

  

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

 

 

二、JMS  Topic消息的持久化

  Topic消息进行持久化,须要发布方、订阅方都进行持久化。

  消息的发布方须要设置消息为持久化消息:

 

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

  消息的订阅方须要设置持久化接收(订阅方须要指定本身的ClientID):

 

 

connection.setClientID("clientid");
consumer=session.createDurableSubscriber(topic, "clientid");

 

三、MQTT Topic消息的持久化

  消息发布方:

 

mqtt.setClientId("clientid");
mqtt.setCleanSession(false);

  消息订阅方:

 

 

mqtt.setClientId("clientid");
mqtt.setCleanSession(false);