【Spring Boot】20.RabbitMQ高级

简介

前面咱们已经学习了如何在RabbitMQ的安装及简单使用以及在SpringBoot中集成RabbitMQ组件,接下来咱们来学习RabbitMQ的一些高级特性。java

RabbitMQ监听器

一、添加Bookweb

为了测试监听器的使用场景,咱们先构建一个bean。spring

bean/Book.class
package com.zhaoyi.bweb.bean;

public class Book {
    private String author;
    private String bookName;
    private String introduce;

    public Book() {
    }

    public static Book defaultBook(){
        return new Book("红楼梦","曹雪芹", "四大名著之一...");
    }

    public Book(String bookName, String author, String introduce) {
        this.author = author;
        this.bookName = bookName;
        this.introduce = introduce;
    }

    public String getIntroduce() {
        return introduce;
    }

    public void setIntroduce(String introduce) {
        this.introduce = introduce;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    public String getBookName() {
        return bookName;
    }

    public void setBookName(String bookName) {
        this.bookName = bookName;
    }

    @Override
    public String toString() {
        return "Book{" +
                "author='" + author + '\'' +
                ", bookName='" + bookName + '\'' +
                ", introduce='" + introduce + '\'' +
                '}';
    }
}

使用咱们上一节中学习使用的项目进行接下来的操做。json

在应用中使用RabbitMQ的监听器。springboot

二、在主程序处开启基于注解的RabbitMQ模式app

BwebApplication.class
package com.zhaoyi.bweb;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableRabbit
@SpringBootApplication
public class BwebApplication {
    public static void main(String[] args) {
        SpringApplication.run(BwebApplication.class, args);
    }
}

三、编写一个Service,监听消息队列ide

service/BookService.class
package com.zhaoyi.bweb.service;

import com.zhaoyi.bweb.bean.Book;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class BookService {

    @RabbitListener(queues = {"joyblack.news"})
    public void listernerBook(Book book){
        System.out.println("receive a message:");
        System.out.println(book);
    }
}

该监听方法会将消息体数据映射到Book对象,若是数据类型不一致会出现应用程序异常。学习

该service中的listernerBook监听了咱们定制的MQ服务的joyblack.news的队列信息,当joyblack.news接受到信息的时候,会调用该方法。测试

四、 在测试用例中添加一个测试用例,用于向joyblack.news中发送包含Book数据的消息。网站

test/BwebApplicationTests
@Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void send(){
        rabbitTemplate.convertAndSend("exchange.direct", "joyblack.news", Book.defaultBook());
    }

运行主程序。

而后运行咱们的测试运行,能够看到,每当咱们运行一次测试用例,就会触发service的listernerBook的一次调用,并打印结果:

...
receive a message:
Book{author='曹雪芹', bookName='红楼梦', introduce='四大名著之一...'}
receive a message:
Book{author='曹雪芹', bookName='红楼梦', introduce='四大名著之一...'}
....

也就是说,经过@EnableRabbit以及@RabbitListener两个注解,咱们就能够在springboot实现简单的消息监听了。

固然,咱们也能够有其余的接收消息的模式,好比获取消息所有内容:

service/BookService.class
package com.zhaoyi.bweb.service;

import com.zhaoyi.bweb.bean.Book;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class BookService {

//    @RabbitListener(queues = {"joyblack.news"})
//    public void listernerBook(Book book){
//        System.out.println("receive a message:");
//        System.out.println(book);
//    }

    @RabbitListener(queues = {"joyblack.news"})
    public void listernerBook(Message message){
        System.out.println("receive a message:");
        System.out.println(message);
    }
}

发送消息后打印的结果为:

receive a message:
(Body:'{"author":"曹雪芹","bookName":"红楼梦","introduce":"四大名著之一..."}' MessageProperties [headers={__TypeId__=com.zhaoyi.bweb.bean.Book}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=joyblack.news, deliveryTag=1, consumerTag=amq.ctag-a8Co2RP7og21nB5A6R5QbQ, consumerQueue=joyblack.news])

即包含了整个消息内容。

管理组件AmqpAdmin

前面咱们已经经过MQ的组件测试了不少有意思的功能,可是别忘了,咱们不少组件,好比交换器、消息队列等,都是咱们经过RabbitMQ的管理网站事先建立的。那么,咱们会有这样的疑问,可不能够经过RabbitMQ提供的什么组件帮咱们在应用程序中完成这样的操做呢?答案是能!

这个组件就是咱们这一章节将要讲到的AmqpAdmin。经过AmqpAdmin咱们能够建立交换器、消息队列以及绑定规则等。

要使用AmqpAdmin很简单,还记得咱们以前讲过的自动配置类吗,他提供的两个重要组件之一就是AmqpAdmin。咱们直接在应用程序中注入该组件就可使用了。

建立交换器

Exchange.inteface是MQ组件中定义的一个接口

org.springframework.amqp.core.Exchange
package org.springframework.amqp.core;

import java.util.Map;

public interface Exchange extends Declarable {
    String getName();

    String getType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

    boolean isDelayed();

    boolean isInternal();
}

咱们查看该接口的实现类有5个(其实有一个抽象类做为中间件),他们分别是DirectExchange、FanoutExchange、CustomExchange、TopicExchange以及HeadersExchange。其中有2个咱们不用在乎,其余三个恰好对应咱们以前所讲到的3种交换器类型,所以,要建立交换器,直接建立对应类型的交换器便可,例如,咱们建立一个direct类型的交换器,并命名为exchange.myDirect.

Test.class
package com.zhaoyi.bweb;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class BwebApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange(){
        amqpAdmin.declareExchange(new DirectExchange("exchange.myDirect"));
        System.out.println("create exchange success.");
    }
}

运行该测试用例,咱们就能够在管理网站处的Exchanges选项卡查看到新建立的direct类型的exchange了。

amqpAdmin.declarexxxx能够建立xxx类型的RabbitMQ组件。

同理,咱们能够经过amqpAdmin.declareQueue建立其余的组件,提供的参数一一对应于咱们配置对应组件时指定的那些配置选项。

建立队列

Test.class
@Test
    public void createQueue(){
        amqpAdmin.declareQueue(new Queue("queue.myQueue", true));
        System.out.println("create Queue success.");
    }

该测试用例建立了一个name=queue.myQueue,以及durable为true(便可持久化)的队列。

建立绑定规则

建立绑定规则时咱们须要查看一下Binding类对应的方法参数:

org.springframework.amqp.core.Binding
public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments) {
		// 目的地
        this.destination = destination;
		// 目的的类型
        this.destinationType = destinationType;
		// 交换器
        this.exchange = exchange;
		// 路由键
        this.routingKey = routingKey;
		// 额外参数
        this.arguments = arguments;
    }

所以,咱们对应这些参数进行配置就能够了,你也能够感受获得,这些参数都是和咱们的管理网站的可视化配置一一对应起来的:

Test.class
@Test
    public void createBinding(){
        amqpAdmin.declareBinding(new Binding("queue.myQueue",
                Binding.DestinationType.QUEUE,
                "exchange.myDirect",
                "queue.myQueue",
                null
                ));
        System.out.println("create Binding success.");
    }

能够看到,咱们定义了一个绑定规则,他是绑定在交换器exchange.myDirect上,路由键为queue.myQueue,并指向目的地为queue.myQueue的队列。

如今,查看管理网站,能够清晰的看到咱们此次建立的三个组件,以及他们之间的绑定关系。

注意路由键这个属性,一般状况下,咱们会将其命名为和目的地队列同样的名称,但请不要混淆两者。若是你的应用足够复杂,显然是不少绑定规则,而且路由键是多种多样的。

amqpAdmin.deleteXxx 能够帮助咱们删除指定名称的交换器和队列,你们能够本身尝试使用。

相关文章
相关标签/搜索