在SpringBoot中集成ActiveMQ相对仍是比较简单的,不须要安装什么服务, 默认使用内存中的ActiveMQ,配合外置ActiveMQ Server会更好.
<!-- activemq启动器 不加这个,用springboot的启动器也能够,由于springboot内置了activemq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> <version>2.0.5.RELEASE</version> </dependency> <!-- springBoot内置,能够不加,加上会更好 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.0</version> </dependency> -->
package com.manlu; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author 漫路 */ @SpringBootApplication public class MqSbApplication { public static void main(String[] args) { SpringApplication.run(MqSbApplication.class,args); } }
spring: activemq: broker-url: tcp://127.0.0.1:61616 #MQ所在的服务器地址 in-memory: true #是否使用内置的MQ。 true:使用; false:不使用; non-blocking-redelivery: false #是否在回滚消息以前中止消息传递。当启用此命令时,消息顺序不会被保留 user: admin # 用户名 password: admin # 密码
spring.activemq.broker-url=tcp://127.0.0.1:61616 # 在考虑结束以前等待的时间 #spring.activemq.close-timeout=15s # 默认代理URL是否应该在内存中。若是指定了显式代理,则忽略此值。 spring.activemq.in-memory=true # 是否在回滚回滚消息以前中止消息传递。这意味着当启用此命令时,消息顺序不会被保留。 spring.activemq.non-blocking-redelivery=false # 密码 spring.activemq.password=admin # 等待消息发送响应的时间。设置为0等待永远。 spring.activemq.user=admin # 是否信任全部包 #spring.activemq.packages.trust-all= # 要信任的特定包的逗号分隔列表(当不信任全部包时) #spring.activemq.packages.trusted= # 当链接请求和池满时是否阻塞。设置false会抛“JMSException异常”。 #spring.activemq.pool.block-if-full=true # 若是池仍然满,则在抛出异常前阻塞时间。 #spring.activemq.pool.block-if-full-timeout=-1ms # 是否在启动时建立链接。能够在启动时用于加热池。 #spring.activemq.pool.create-connection-on-startup=true # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。 #spring.activemq.pool.enabled=false # 链接过时超时。 #spring.activemq.pool.expiry-timeout=0ms # 链接空闲超时 #spring.activemq.pool.idle-timeout=30s # 链接池最大链接数 #spring.activemq.pool.max-connections=1 # 每一个链接的有效会话的最大数目。 #spring.activemq.pool.maximum-active-session-per-connection=500 # 当有"JMSException"时尝试从新链接 #spring.activemq.pool.reconnect-on-exception=true # 在空闲链接清除线程之间运行的时间。当为负数时,没有空闲链接驱逐线程运行。 #spring.activemq.pool.time-between-expiration-check=-1ms # 是否只使用一个MessageProducer #spring.activemq.pool.use-anonymous-producers=true
package com.manlu.config; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; import javax.jms.Topic; /** * @author 漫路 */ @Configuration public class ActiveMQConfig { @Bean public Queue queue(){ return new ActiveMQQueue("manlu.queue"); } @Bean public Topic topic(){ return new ActiveMQTopic("manlu.topic"); } }
package com.manlu.mq; import org.apache.activemq.command.ActiveMQMapMessage; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.jms.MapMessage; import javax.jms.Queue; /** * 消息的生产者 * * @author 漫路 */ @Component @EnableScheduling public class QueueProducer { /* * @Resource // 也能够注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装 * private JmsMessagingTemplate jmsTemplate; // * 发送消息,destination是发送到的队列,message是待发送的消息 * * @Scheduled(fixedDelay=3000)//每3s执行1次 * public void sendMessage(Destination destination, final String message){ * jmsTemplate.convertAndSend(destination, message); * } */ @Resource private JmsMessagingTemplate jmsMessagingTemplate; @Resource private Queue queue; @Scheduled(fixedDelay = 3000)//3秒执行1次 public void send() { try { MapMessage mapMessage = new ActiveMQMapMessage(); mapMessage.setString("info", "小老弟在敲代码"); jmsMessagingTemplate.convertAndSend(queue, mapMessage); } catch (Exception e) { e.printStackTrace(); } } }
package com.manlu.mq; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; /** * 消息的消费者 * @author 漫路 */ @Component public class QueueConsumer { //使用JmsListener配置消费者监听的队列,其中Message是接收到的消息 @JmsListener(destination = "manlu.queue") public void receiveQueue(Message message){ try { MapMessage mapMessage = (MapMessage) message; String info = mapMessage.getString("info"); System.out.println(info); } catch (Exception e) { e.printStackTrace(); } } }
如何运行看这个博客: 看目录找运行便可: http://www.javashuo.com/article/p-blwcetlu-ez.htmljava