声明:如下内容来自网络整理和本身的测试调整。java
感谢:http://greemranqq.iteye.com/blog/2167158 和 http://www.mytju.com/classcode/news_readNews.asp?newsID=486mysql
ActiveMQ 提供了2种方式的消息通讯机制:点对点 和 发布/订阅 模式。
(1).使用点对点(Queue,即队列)时,每一个消息只有一个消费者,因此,持久化很简单,只要保存到数据库便可。而后,随便一个消费者取走处理便可。某个消费者关掉一阵子,也无所谓。
(2).使用发布/订阅(Topic,即订阅)时,每一个消息能够有多个消费者,就麻烦一些。spring
首先,假设Topic消费者都是普通的消费者:
-------------------------------------------------------------------sql
---------------------------------------------------------------------
总结一下:
activemq只是向当前启动的消费者发送消息。关掉的消费者,会错过不少消息,并没有法再次接收这些消息。
若是发送的消息是重要的用户同步数据,错过了,用户数据就不一样步了。那么,如何让消费者从新启动时,接收到错过的消息呢?数据库
答案是:持久订阅。apache
张三说,我是张三,有馒头给我留着,我回来拿。
李四说,我是李四,有馒头给我留着,我回来拿。
activemq 就记下张三,李四两个名字。缓存
那么,分馒头时,仍是一我的头给一个馒头。
分完了,一看张三没说话,说明他不在,给他留一个。
李四说话了,那就不用留了。服务器
张三回来了,找activemq,一看,这不张三吧,快把他的馒头拿来。
多是一个馒头,也多是100个馒头,就看张三离开这阵子,分了多少次馒头了。网络
activemq区分消费者,是经过 clientId 和 durableSubscriptionName(订户名称) 来区分的。session
要实现 发布/订阅模式 的 永久消费者订阅,首先必须先进行发布消息的持久化。下面采用MySQL数据库做为持久化存储方式。
1. 修改ActiveMQ服务的持久化配置( conf/activemq.xml ):
<!-- 注意:须要将 mysql-connector-java-version.jar 和 druid-version.jar 包 放到 avtivemq的 lib 目录下 --> <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq_db" /> <property name="username" value="root" /> <property name="password" value="123456" /> <property name="initialSize" value="2" /> <property name="minIdle" value="2" /> <property name="maxActive" value="50" /> </bean> <persistenceAdapter> <!-- 注意:createTablesOnStartup 在第一次启动时要设置为 true,后面修改成 false --> <jdbcPersistenceAdapter dataDirectory="${activemq.data}/mysql" dataSource="#mysql-ds" createTablesOnStartup="false" useDatabaseLock="true"/> </persistenceAdapter>
2. 修改 spring-jms.xml 的配置(deliveryPersistent 或 deliveryMode):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- jms 链接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" /> </bean> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectionFactory"/> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 基本的bean模板 --> <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"> <!-- 连接工长 --> <property name="connectionFactory" ref="cachingConnectionFactory"/> <!-- 进行持久化 --> <property name="deliveryPersistent" value="true" /> <!-- 等同于上面 deliveryPersistent = true --> <!-- <property name="deliveryMode" value="2" /> --> <!--订阅 发布模式 --> <property name="pubSubDomain" value="true" /> </bean> <!-- 消息订阅模式 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 订阅消息的名字 --> <constructor-arg index="0" value="orderTopic"/> </bean> </beans>
3. 想一想咱们订阅者须要作些什么呢?
发布者发布消息,订阅者去消费,这是1对多的形式,咱们能够这样理解:公司设定不少活动代金卷,去参加活动的人都能领取,固然这分两种状况,第一种 就是咱们前面测试的,只要我公司门口等(监听),活动开始(发布)就能领取了,若是你当时没在,就领取不到。第二种:不少状况下,公司搞活动咱们不会等在那里,只要活动开始了,那么我过段时间也能够去,礼品公司会保留的,这种状况会致使屡次领取,所以总要登记一下嘛,不能你领取了,过一会又来吧?activemq 里面会有 clientId 标示来区分,相似于身份证ID嘛。固然有些状况下, 咱们一个ID 能够领取多个不一样的奖品,所以还得须要个字段标示:durableSubscriptionName,标示咱们领取哪一个礼品,下面先看配置:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- jms 链接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <!-- Jvm 内部传输,暂时不用TCP,使用暂时异步传输 --> <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" /> </bean> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectionFactory"/> <!-- 接收者ID --> <property name="clientId" value="Client-A" /> </bean> <!-- 消息订阅模式 --> <bean id="topicCustomerA" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 订阅消息的名字 --> <constructor-arg index="0" value="orderTopic"/> </bean> <!-- 消息监听,这里能够认为是A服务器的监听 --> <bean id="messageListener" class="com.xxx.ConsumerMessageListener"/> <bean id="listenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicCustomerA" /> <property name="messageListener" ref="messageListener" /> <!-- 持久化消息 --> <property name="subscriptionDurable" value="true"/> <!-- 接收者ID --> <property name="clientId" value="Client-A" /> <!-- 这里名字能够任意改变,A 领取了,你能够改为B 还能够领取,能够举例不是很恰当 --> <property name="durableSubscriptionName" value="clientA"/> </bean> </beans>
// 监听代码 直接输出 public class ConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println("topic 收到消息:"+ ((TextMessage)message).getText()); } }
4. 测试
采用持久化方式,模拟测试以下:
1. 启动两个ConsumerA,ConsumerB 监听,发布一个topic消息 ,同时收到消息,OK
2. 启动一个ConsumerA,发布一个topic消息,再启动ConsumerB,也收到消息,OK
3. 启动一个ConsumerA,发布一个topic消息,A收到,关闭mq服务器,重启mq服务,重启 ConsumerB,一样收到消息,OK。
4. 启动两个ConsumerA,ConsumerB,发布topic消息,A、B收到消息,重启A、B 不收重复消息,OK
那么新问题是:
1. 若是A、B 收到消息后,topic 的消息怎么处理呢? 一直保存着吗? 若是能够清除,怎么清除,何时进行清除呢?
最后附上我本身测试经过的完整的 spring-jms.xml配置(在注册消费者等地方略有变化,采用<jms:listener-container>等替代):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- ActiveMQ 链接工厂 --> <!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 若是链接网络:tcp://ip:61616;未链接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="failover(tcp://127.0.0.1:61616)" userName="admin" password="admin" /> <!-- Spring Caching链接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> <!-- 接收者ID,用于Topic订阅者的永久订阅 --> <property name="clientId" value="Client-A" /> </bean> <!-- Spring JmsTemplate 的消息生产者--> <!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是咱们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> <!-- 订阅消息持久化 --> <property name="deliveryPersistent" value="true" /> <!-- 配置持久化,同上 deliveryPersistent <property name="deliveryMode" value="2" /> --> </bean> <!-- 消息消费者 --> <!-- 定义Topic监听器 --> <!-- 注意:定义 client-id = Client-A --> <jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory" acknowledge="auto" client-id="Client-A"> <!-- 注意:定义 subscription(即:durableSubscriptionName) --> <jms:listener destination="test.topic" subscription="topic_receiver1" ref="topicReceiver1"/> <jms:listener destination="test.topic" subscription="topic_receiver2" ref="topicReceiver2"/> </jms:listener-container> </beans>
// Topic订阅者1 @Component("topicReceiver1") public class TopicReceiver1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("TopicReceiver1 收到消息:"+ ((TextMessage) message).getText()); } catch(JMSException e) { // todo System.out.println("TopicReceiver1订阅失败!" + e.getMessage()); e.printStackTrace(); } } }
// Topic订阅者2 @Component("topicReceiver2") public class TopicReceiver2 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("TopicReceiver2 收到消息:"+ ((TextMessage) message).getText()); } catch(JMSException e) { // todo System.out.println("TopicReceiver2订阅失败!" + e.getMessage()); e.printStackTrace(); } } }