maven引用java
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.3.RELEASE</version> </dependency>
提供者配置文件(含订阅者配置文件)spring
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.3.xsd"> <description>JMS配置</description> <!-- ActiveMQ 链接工厂 --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.broker_url}" /> <property name="userName" value="${jms.userName}" /> <property name="password" value="${jms.password}" /> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <!-- Spring Caching 链接工厂 --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="jmsFactory" /> <property name="sessionCacheSize" value="10" /> </bean> <!-- 发布订阅模式配置 --> <bean id="jmsTemplatePubSub" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <!-- 区别它采用的模式为false是p2p为true是订阅 --> <property name="pubSubDomain" value="true" /> <!-- receiveTimeout表示接收消息时的超时时间 --> <property name="receiveTimeout" value="20000" /> </bean> <bean id="updateDecisionFile" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="update.decision.file" /> </bean> <!-- 消息监听(订阅者)配置 --> <bean id="updateDecisionFileContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <!-- 发布订阅模式 --> <property name="pubSubDomain" value="true"/> <property name="destination" ref="updateDecisionFile" /> <property name="messageListener" ref="updateDecisionFileMessageListener" /> <property name="concurrentConsumers" value="1" /> </bean> <bean class="org.springframework.jms.config.DefaultJmsListenerContainerFactory" id="jmsListenerContainerFactory"> <property name="connectionFactory" ref="cachingConnectionFactory" /> </bean> <jms:annotation-driven /> </beans>
providerapache
package com.cloud.cloudmanage.mq.produce; import java.io.Serializable; import javax.annotation.Resource; import javax.jms.Destination; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; @Component("updateDecisionFileProduce") public class UpdateDecisionFileProduce { @Autowired(required = false) @Resource(name = "jmsTemplatePubSub") private JmsTemplate jmsTemplate; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Autowired(required = false) @Resource(name="updateDecisionFile") private Destination notifyQueue; public void send(final Serializable ob) { sendMessage(ob, notifyQueue); } /** * 使用jmsTemplate最简便的封装convertAndSend()发送Map类型的消息. */ private void sendMessage(Serializable msg, Destination destination) { jmsTemplate.convertAndSend(destination, msg); } }
订阅者代码:session
package com.cloud.welkin.engine.mq.consumer; import javax.jms.Message; import javax.jms.MessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component("updateDecisionFileMessageListener") public class UpdateDecisionFileMessageListener implements MessageListener { private static Logger logger = LoggerFactory.getLogger(UpdateDecisionFileMessageListener.class); @Override public void onMessage(Message message) { try { //todo 在这里写逻辑 } catch (Exception e) { logger.error("{}",e); } } }
总结:maven
spring JmsTemplate 整合消息队列,不论是发布订阅或者点对点,其实程序中的生产者、消费者(订阅者)代码并没有太大区别,区别的是程序配置。ide