ActiveMQ + Spring 持久化并注册永久订阅者

声明:如下内容来自网络整理和本身的测试调整。java

感谢:http://greemranqq.iteye.com/blog/2167158 和 http://www.mytju.com/classcode/news_readNews.asp?newsID=486mysql

ActiveMQ 提供了2种方式的消息通讯机制:点对点发布/订阅 模式。
(1).使用点对点(Queue,即队列)时,每一个消息只有一个消费者,因此,持久化很简单,只要保存到数据库便可。而后,随便一个消费者取走处理便可。某个消费者关掉一阵子,也无所谓。
(2).使用发布/订阅(Topic,即订阅)时,每一个消息能够有多个消费者,就麻烦一些。spring

首先,假设Topic消费者都是普通的消费者:
-------------------------------------------------------------------sql

  • activemq启动后,发布消息1,惋惜,如今没有消费者启动着,也就是没有消费者进行了订阅。那么,这个消息就被抛弃了。
  • 消费者1启动了,链接了activemq,进行了订阅,在等待消息~~,activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。
  • 消费者2也启动了,链接了activemq,进行了订阅,在等待消息~~,activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
  • 消费者1关掉了。activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。
  • 消费者1又启动了。activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。

---------------------------------------------------------------------
总结一下:
activemq只是向当前启动的消费者发送消息。关掉的消费者,会错过不少消息,并没有法再次接收这些消息。
若是发送的消息是重要的用户同步数据,错过了,用户数据就不一样步了。那么,如何让消费者从新启动时,接收到错过的消息呢?数据库

答案是:持久订阅apache

  • 普通的订阅,不区分消费者,场地里有几我的头,就扔几个馒头。
  • 持久订阅,就要记录消费者的名字了。

张三说,我是张三,有馒头给我留着,我回来拿。
李四说,我是李四,有馒头给我留着,我回来拿。
activemq 就记下张三,李四两个名字。缓存

那么,分馒头时,仍是一我的头给一个馒头。
分完了,一看张三没说话,说明他不在,给他留一个。
李四说话了,那就不用留了。服务器

张三回来了,找activemq,一看,这不张三吧,快把他的馒头拿来。
多是一个馒头,也多是100个馒头,就看张三离开这阵子,分了多少次馒头了。网络

activemq区分消费者,是经过 clientId durableSubscriptionName(订户名称) 来区分的。session

要实现 发布/订阅模式 的 永久消费者订阅,首先必须先进行发布消息的持久化。下面采用MySQL数据库做为持久化存储方式。

1. 修改ActiveMQ服务的持久化配置( conf/activemq.xml ):

<!-- 注意:须要将 mysql-connector-java-version.jar 和 druid-version.jar 包 放到 avtivemq的 lib 目录下 -->
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">  
	<property name="driverClassName" value="com.mysql.jdbc.Driver" />  
	<property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq_db" />  
	<property name="username" value="root" />  
	<property name="password" value="123456" />    
	<property name="initialSize" value="2" />    
	<property name="minIdle" value="2" />     
	<property name="maxActive" value="50" />    
</bean>  

<persistenceAdapter>
	<!-- 注意:createTablesOnStartup 在第一次启动时要设置为 true,后面修改成 false -->
	<jdbcPersistenceAdapter dataDirectory="${activemq.data}/mysql" dataSource="#mysql-ds" createTablesOnStartup="false" useDatabaseLock="true"/>
</persistenceAdapter>

2. 修改 spring-jms.xml 的配置(deliveryPersistent 或 deliveryMode):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
		
  <!-- jms 链接工厂 -->
  <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" />
  </bean>
	
  <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="connectionFactory"/>
    <!-- Session缓存数量 -->
    <property name="sessionCacheSize" value="100" />
  </bean>
	
  <!-- 基本的bean模板 -->
  <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate">
    <!-- 连接工长 -->
    <property name="connectionFactory" ref="cachingConnectionFactory"/>
    <!-- 进行持久化 -->
		<property name="deliveryPersistent" value="true" />
		<!-- 等同于上面 deliveryPersistent = true -->
    <!-- <property name="deliveryMode" value="2" /> -->
     <!--订阅 发布模式 -->
    <property name="pubSubDomain" value="true" />
  </bean>
	
  <!-- 消息订阅模式 -->
  <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <!-- 订阅消息的名字 -->
    <constructor-arg index="0" value="orderTopic"/>
  </bean>
	
</beans>

3. 想一想咱们订阅者须要作些什么呢?

发布者发布消息,订阅者去消费,这是1对多的形式,咱们能够这样理解:公司设定不少活动代金卷,去参加活动的人都能领取,固然这分两种状况,第一种 就是咱们前面测试的,只要我公司门口等(监听),活动开始(发布)就能领取了,若是你当时没在,就领取不到。第二种:不少状况下,公司搞活动咱们不会等在那里,只要活动开始了,那么我过段时间也能够去,礼品公司会保留的,这种状况会致使屡次领取,所以总要登记一下嘛,不能你领取了,过一会又来吧?activemq 里面会有 clientId 标示来区分,相似于身份证ID嘛。固然有些状况下, 咱们一个ID 能够领取多个不一样的奖品,所以还得须要个字段标示:durableSubscriptionName,标示咱们领取哪一个礼品,下面先看配置:

<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
		
  <!-- jms 链接工厂 -->
  <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <!-- Jvm 内部传输,暂时不用TCP,使用暂时异步传输  -->
    <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" />
  </bean>
	
  <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="connectionFactory"/>
    <!-- 接收者ID -->
    <property name="clientId" value="Client-A" />
  </bean>
	
  <!-- 消息订阅模式 -->
  <bean id="topicCustomerA" class="org.apache.activemq.command.ActiveMQTopic">
    <!-- 订阅消息的名字 -->
    <constructor-arg index="0" value="orderTopic"/>
  </bean>
	
  <!-- 消息监听,这里能够认为是A服务器的监听 -->
  <bean id="messageListener" class="com.xxx.ConsumerMessageListener"/>
	
  <bean id="listenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="topicCustomerA" />
    <property name="messageListener" ref="messageListener" />
    <!-- 持久化消息 -->
    <property name="subscriptionDurable" value="true"/>
    <!-- 接收者ID -->
    <property name="clientId" value="Client-A" />
    <!-- 这里名字能够任意改变,A 领取了,你能够改为B 还能够领取,能够举例不是很恰当 -->
    <property name="durableSubscriptionName" value="clientA"/>
  </bean>
	
</beans>
// 监听代码 直接输出
public class ConsumerMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println("topic 收到消息:"+ ((TextMessage)message).getText());
    }
}

4. 测试

采用持久化方式,模拟测试以下:

1. 启动两个ConsumerA,ConsumerB 监听,发布一个topic消息 ,同时收到消息,OK
2. 启动一个ConsumerA,发布一个topic消息,再启动ConsumerB,也收到消息,OK
3. 启动一个ConsumerA,发布一个topic消息,A收到,关闭mq服务器,重启mq服务,重启 ConsumerB,一样收到消息,OK。
4. 启动两个ConsumerA,ConsumerB,发布topic消息,A、B收到消息,重启A、B  不收重复消息,OK    

那么新问题是:
1. 若是A、B 收到消息后,topic  的消息怎么处理呢? 一直保存着吗? 若是能够清除,怎么清除,何时进行清除呢?

最后附上我本身测试经过的完整的 spring-jms.xml配置(在注册消费者等地方略有变化,采用<jms:listener-container>等替代):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

	<!-- ActiveMQ 链接工厂 -->
 	<!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
	<!-- 若是链接网络:tcp://ip:61616;未链接网络:tcp://localhost:61616 以及用户名,密码-->
	<amq:connectionFactory id="amqConnectionFactory" brokerURL="failover(tcp://127.0.0.1:61616)" userName="admin" password="admin"  />

	<!-- Spring Caching链接工厂 -->
 	<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory -->  
  		<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
  		<!-- 同上,同理 -->
		<!-- <constructor-arg ref="amqConnectionFactory" /> -->
		<!-- Session缓存数量 -->
		<property name="sessionCacheSize" value="100" />
		<!-- 接收者ID,用于Topic订阅者的永久订阅 -->
		<property name="clientId" value="Client-A" />
	</bean>
	
	<!-- Spring JmsTemplate 的消息生产者-->

	<!-- 定义JmsTemplate的Topic类型 -->
	<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
		 <!-- 这个connectionFactory对应的是咱们定义的Spring提供的那个ConnectionFactory对象 -->  
		<constructor-arg ref="connectionFactory" />
		<!-- pub/sub模型(发布/订阅) -->
		<property name="pubSubDomain" value="true" />
		<!-- 订阅消息持久化 -->
		<property name="deliveryPersistent" value="true" />
		<!-- 配置持久化,同上 deliveryPersistent
		<property name="deliveryMode" value="2" />
		-->
	</bean>

	
	<!-- 消息消费者 -->
	
	<!-- 定义Topic监听器 -->
	<!-- 注意:定义 client-id = Client-A -->
	<jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory" acknowledge="auto" client-id="Client-A">
		<!-- 注意:定义 subscription(即:durableSubscriptionName) -->
		<jms:listener destination="test.topic" subscription="topic_receiver1" ref="topicReceiver1"/>
		<jms:listener destination="test.topic" subscription="topic_receiver2" ref="topicReceiver2"/>
	</jms:listener-container>
	
</beans>
// Topic订阅者1
@Component("topicReceiver1")
public class TopicReceiver1 implements MessageListener 
{
	@Override
	public void onMessage(Message message) 
	{
		try {
			System.out.println("TopicReceiver1 收到消息:"+ ((TextMessage) message).getText());
		} catch(JMSException e) {
			// todo
			System.out.println("TopicReceiver1订阅失败!" + e.getMessage());
			e.printStackTrace();
		}
	}
}
// Topic订阅者2
@Component("topicReceiver2")
public class TopicReceiver2 implements MessageListener 
{
	@Override
	public void onMessage(Message message) 
	{
		try {
			System.out.println("TopicReceiver2 收到消息:"+ ((TextMessage) message).getText());
		} catch(JMSException e) {
			// todo
			System.out.println("TopicReceiver2订阅失败!" + e.getMessage());
			e.printStackTrace();
		}
	}
}