SpringBoot整合ActiveMQ

1.SpringBoot整合ActiveMQ

在SpringBoot中集成ActiveMQ相对仍是比较简单的,不须要安装什么服务,
默认使用内存中的ActiveMQ,配合外置ActiveMQ Server会更好.

1.1 建立工程maven子模块

1.1.1 建立mq_springBoot子模块

1.1.2 添加相关依赖

<!-- activemq启动器    不加这个,用springboot的启动器也能够,由于springboot内置了activemq-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-activemq</artifactId>
  <version>2.0.5.RELEASE</version>
</dependency>

<!-- springBoot内置,能够不加,加上会更好
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.14.0</version>
</dependency>
-->

1.1.3 建立程序入口

package com.manlu;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author 漫路
 */
@SpringBootApplication
public class MqSbApplication {
    public static void main(String[] args) {
        SpringApplication.run(MqSbApplication.class,args);
    }
}

1.1.4 建立application.yml文件

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616 #MQ所在的服务器地址
    in-memory: true #是否使用内置的MQ。  true:使用; false:不使用;
    non-blocking-redelivery: false #是否在回滚消息以前中止消息传递。当启用此命令时,消息顺序不会被保留
    user: admin # 用户名
    password: admin # 密码
  • 配置的全部具体意思以下(properties格式):
spring.activemq.broker-url=tcp://127.0.0.1:61616
# 在考虑结束以前等待的时间
#spring.activemq.close-timeout=15s
# 默认代理URL是否应该在内存中。若是指定了显式代理,则忽略此值。
spring.activemq.in-memory=true
# 是否在回滚回滚消息以前中止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
spring.activemq.non-blocking-redelivery=false
# 密码
spring.activemq.password=admin
# 等待消息发送响应的时间。设置为0等待永远。
spring.activemq.user=admin
# 是否信任全部包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗号分隔列表(当不信任全部包时)
#spring.activemq.packages.trusted=
# 当链接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
#spring.activemq.pool.block-if-full=true
# 若是池仍然满,则在抛出异常前阻塞时间。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在启动时建立链接。能够在启动时用于加热池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 链接过时超时。
#spring.activemq.pool.expiry-timeout=0ms
# 链接空闲超时
#spring.activemq.pool.idle-timeout=30s
# 链接池最大链接数
#spring.activemq.pool.max-connections=1
# 每一个链接的有效会话的最大数目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 当有"JMSException"时尝试从新链接
#spring.activemq.pool.reconnect-on-exception=true
# 在空闲链接清除线程之间运行的时间。当为负数时,没有空闲链接驱逐线程运行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一个MessageProducer
#spring.activemq.pool.use-anonymous-producers=true

1.1.5 建立config

package com.manlu.config;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;
import javax.jms.Topic;

/**
 * @author 漫路
 */
@Configuration
public class ActiveMQConfig {
    @Bean
    public Queue queue(){
        return new ActiveMQQueue("manlu.queue");
    }
    
    @Bean
    public Topic topic(){
        return new ActiveMQTopic("manlu.topic");
    }
}

1.2 编写消费者和生产者

1.2.1 编写生产者QueueProducer

package com.manlu.mq;

import org.apache.activemq.command.ActiveMQMapMessage;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.MapMessage;
import javax.jms.Queue;

/**
 * 消息的生产者
 *
 * @author 漫路
 */
@Component
@EnableScheduling
public class QueueProducer {
    /*
    * @Resource // 也能够注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
    * private JmsMessagingTemplate jmsTemplate; //
    * 发送消息,destination是发送到的队列,message是待发送的消息
    *
    * @Scheduled(fixedDelay=3000)//每3s执行1次
    * public void sendMessage(Destination destination, final String message){
    *   jmsTemplate.convertAndSend(destination, message);
    * }
    */
    @Resource
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Resource
    private Queue queue;

    @Scheduled(fixedDelay = 3000)//3秒执行1次
    public void send() {
        try {
            MapMessage mapMessage = new ActiveMQMapMessage();
            mapMessage.setString("info", "小老弟在敲代码");
            jmsMessagingTemplate.convertAndSend(queue, mapMessage);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

1.2.2 编写消费者QueueConsumer

package com.manlu.mq;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;

/**
 * 消息的消费者
 * @author 漫路
 */
@Component
public class QueueConsumer {
    //使用JmsListener配置消费者监听的队列,其中Message是接收到的消息
    @JmsListener(destination = "manlu.queue")
    public void receiveQueue(Message message){
        try {
            MapMessage mapMessage = (MapMessage) message;
            String info = mapMessage.getString("info");
            System.out.println(info);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1.2.3 启动

注意: 启动前须要先运行你的ActiveMQ

如何运行看这个博客: 看目录找运行便可: http://www.javashuo.com/article/p-blwcetlu-ez.htmljava

  • 启动springboot

  • 访问activemq看一下

若是根据the blog 出现问题, 评论便可, 我看到会尝试解决

相关文章
相关标签/搜索