Springboot 集成rabbitmq

     RabbitMQ是一款基于AMQP(消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlang等。
对于RabbitMQ来讲,生产者和消息队列之间存在隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列。消费者经过读取消息队列从而实现消息的发送和接收。html


   交换机的主要做用是接收相应的消息而且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.git

  Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即建立消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.github

  topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.web

  headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.spring

  Fanout是路由广播的形式,将会把消息发给绑定它的所有队列,即使设置了key,也会被忽略.c#

消息队列两个用处:服务间解耦,缓解压力(削峰平谷);
RabbitMQ实现了AQMP协议,AQMP协议定义了消息路由规则和方式。生产端经过路由规则发送消息到不一样queue,消费端根据queue名称消费消息。
RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后当即删除,不保留历史消息。
(1)点对点
生产端发送一条消息经过路由投递到Queue,只有一个消费者能消费到。springboot

(2)多订阅
当RabbitMQ须要支持多订阅时,发布者发送的消息经过路由同时写到多个Queue,不一样订阅组消费不一样的Queue。因此支持多订阅时,消息会多个拷贝。服务器

二、安装RabbitMQ服务端
    (1)下载Erlang安装包:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度网盘地址)
    (2)安装和配置RabbitMQ服务端,3.6.0版本:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度网盘地址)
        (官方:安装Erland,经过官方下载页面http://www.erlang.org/downloads获取exe安装包,直接打开并完成安装。
         安装RabbitMQ,经过官方下载页面https://www.rabbitmq.com/download.html获取exe安装包。)
    (3)启用web管理插件:rabbitmq-plugins enable rabbitmq_management
    (4)启动RabbitMQ:chkconfig rabbitmq-server on  /sbin/service rabbitmq-server start
    (5)防火墙开通端口
# firewall-cmd --permanent --zone=public --add-port=5672/tcp
# firewall-cmd --permanent --zone=public --add-port=15672/tcp
# firewall-cmd --reload
(6)rabbitmq默认会建立guest帐号,只能用于localhost登陆页面管理员,本机访问地址:http://localhost:15672/app

三.SpringBoot整合RabbitMQ(Topic转发模式)tcp

  首先咱们看发送端,咱们须要配置队列Queue,再配置交换机(Exchange),再把队列按照相应的规则绑定到交换机上:


首先 配置pom.xml
    <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

接着在application.properties中,去编辑和RabbitMQ相关的配置信息,配置信息的表明什么内容根据键就能很直观的看出了.这里端口是5672,不是15672...15672是管理端的端口!

server.port=17080
#mq
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=ncs
spring.rabbitmq.password=12345678
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

咱们看发送端,咱们须要配置队列Queue,再配置交换机(Exchange),再把队列按照相应的规则绑定到交换机上:

package com.example.demo.conf;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by ningcs on 2017/10/30.
 */
@Configuration
public class SenderConf {
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
    }
}

 

在SpringBoot中,咱们使用AmqpTemplate去发送消息!代码以下:

package com.example.demo.sender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by ningcs on 2017/10/30.
 */
@Component
public class HelloSender {
    @Autowired
    private AmqpTemplate template;

    public void send(String msg) {
        System.out.println(msg);
        template.convertAndSend("exchange","topic.message","hello,rabbit~");
    }
}

测试

package com.example.demo.controller;

import com.example.demo.sender.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * Created by ningcs on 2017/10/30.
 */
@RestController
@RequestMapping("rabbit")
public class RabbitController {

    @Autowired
    private HelloSender helloSender;

    @RequestMapping(value = "/hello",method = {RequestMethod.GET, RequestMethod.POST})
    public String helloSender(){
        helloSender.send("hello,rabbit~");
        return "发送成功";
    }
}

接收端(能够放在两个项目,配置文件同样):
package com.example.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Created by ningcs on 2017/10/30.
 */
@Component
public class HelloReceive {

//    @RabbitListener(queues="queue")    //监听器监听指定的Queue
//    public void processC(String str) {
//        System.out.println("Receive:"+str);
//    }

    @RabbitListener(queues="topic.message")    //监听器监听指定的Queue
    public void process1(String str) {
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="topic.messages")    //监听器监听指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }

}

访问地址:http://localhost:17080/rabbit/hello
控制台输出如下日志说明成功:
2017-11-02 11:30:11.982  INFO 4476 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17080 (http)
2017-11-02 11:30:11.994  INFO 4476 --- [           main] d.SpringbootRabbitTopicSenderApplication : Started SpringbootRabbitTopicSenderApplication in 12.292 seconds (JVM running for 12.829)
2017-11-02 11:31:06.712  INFO 4476 --- [io-17080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-11-02 11:31:06.713  INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
2017-11-02 11:31:06.729  INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 16 ms
hello,rabbit~
hello,rabbit~

2017-11-02 11:30:13.165  INFO 11064 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17081 (http)
2017-11-02 11:30:13.170  INFO 11064 --- [           main] SpringbootRabbitTopicReceiverApplication : Started SpringbootRabbitTopicReceiverApplication in 13.48 seconds (JVM running for 17.121)
messages:hello,rabbit~
message:hello,rabbit~
messages:hello,rabbit~
message:hello,rabbit~

Fanout 和Direct模式详见最下面github地址。

术语解释

Broker:简单来讲就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列。
Queue:消息队列载体,每一个消息都会被投入到一个或多个队列。
Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里能够开设多个vhost,用做不一样用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务。
RabbitMQ消息队列详细介绍(主要涉及术语)
http://blog.csdn.net/leyangjun/article/details/52529047

通讯协议AMQP(Advanced Message Queuing Protocol)
AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概以下:
客户端链接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间创建好绑定关系。
客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

RabbitMQ消息队列-RabbitMQ的优劣势及产生背景
推荐博客:
http://blog.csdn.net/super_rd/article/details/70229714

详细代码访问github:https://github.com/ningcs/Springboot-rabbit-mq

相关文章
相关标签/搜索