Spring Boot 整合 RabbitMq — 三种Exchange模式

前言

AMQP(Advanced Message Queuing Protocol, 高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样的开发语言等条件的限制。html

RabbitMq与spring boot整合简单了解java

引入springboot amqp包spring

<!-- amqp-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

application.yml数组

spring:
  rabbitmq:
    username: admin
    password: 123456
    host: 192.168.56.128
    port: 5672
    listener:
      simple:
        concurrency: 5 #消费端最小并发数
        max-concurrency: 10 #消费端最大并发数
        prefetch: 5 #一次请求中预处理的消息数量
    cache:
      channel:
        size: 50 # 缓存的channel数量

 

1、发布订阅

 

/**
 * Create by Administrator on 2018/10/12
 * Exchange三种模式配置
 * @author admin
 */
@Configuration
public class RabbitMqExchangeConfig {

    
    /**
     * 广播交换器
     * @return
     */
    @Bean
    public FanoutExchange fanout(){
        return  new FanoutExchange("tut.fanout");
    }

    
    private static class FanoutConfig {

        //AnonymousQueue类型的队列,它的名字是由客户端生成的,并且是非持久的,独占的,自动删除的队列
        @Bean
        public Queue autoDeleteQueue1() {
            return new AnonymousQueue();
        }

        @Bean
        public Queue autoDeleteQueue2() {
            return new AnonymousQueue();
        }

        //队列和交换机绑定
        //这种关系能够读做:这个队列对这个交换器里的消息感兴趣。
        //虽然 Queue类型有多个实例,但spring会自动更加名字匹配,bean名字匹配参数名字
        @Bean
        public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
        }

        @Bean
        public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) {
            return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
        }




    }


}
@Service
public class Tut3Sender {
    private Logger logger = LoggerFactory.getLogger(Tut3Sender.class);
    @Autowired
    private RabbitTemplate template;

    @Autowired
    private FanoutExchange fanout;

    int dots = 0;
    int count = 0;

    public void send(){
        StringBuilder builder = new StringBuilder("Hello");
        if(dots++ == 3){
            dots = 1;
        }
        for(int i =0;i<dots;i++){
            builder.append(".");
        }
        builder.append(Integer.toString(++count));
        String message = builder.toString();
        //向交换机发送信息
        template.convertAndSend(fanout.getName(),"", message);
        logger.info(" [x] Sent '" + message + "'");
    }
}

@RabbitListener 监听队列状况,属性queues为队列名数组缓存

@Component
public class Tut3Receiver {
    private Logger logger = LoggerFactory.getLogger(Tut3Receiver.class);

    @RabbitListener(queues="#{autoDeleteQueue1.name}")
    public void receive1(String in ) throws InterruptedException{
        receive(in,1);
    }

    @RabbitListener(queues="#{autoDeleteQueue2.name}")
    public void receive2(String in ) throws InterruptedException{
        receive(in,2);
    }

    public void receive(String in, int receiver) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        logger.info("instance " + receiver + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        logger.info("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

 

写一个Controller进行接口测试springboot

@Controller
public class RabbitMqController {

    //发布
    @Autowired
    private Tut3Sender tut3Sender;
      

    @RequestMapping("/sendFanout")
    @ResponseBody
    private String sendFanout(){
        for(int i =0;i<10;i++) {
            tut3Sender.send();
        }
        return "ok";
    }


}

2、路由

 

@Configuration
public class RabbitMqExchangeConfig {

   
    /**
     * 广播交换器
     * @return
     */
    @Bean
    public FanoutExchange fanout(){
        return  new FanoutExchange("tut.fanout");
    }


    /**
     * @Bean 经过使用静态类封闭
     */
    private static class DirectConfig{

        @Bean
        public Queue autoDeleteQueue3() {
            return new AnonymousQueue();
        }

        @Bean
        public Queue autoDeleteQueue4() {
            return new AnonymousQueue();
        }

        //                orange  ->queue3
        //  tut.direct -> black   ->queue3,queue4
        //                green   ->queue4
        //


        @Bean
        public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue3) {
            return BindingBuilder.bind(autoDeleteQueue3).to(direct).with("orange");
        }

        @Bean
        public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue3) {
            return BindingBuilder.bind(autoDeleteQueue3).to(direct).with("black");
        }

        @Bean
        public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue4) {
            return BindingBuilder.bind(autoDeleteQueue4).to(direct).with("green");
        }

        @Bean
        public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue4) {
            return BindingBuilder.bind(autoDeleteQueue4).to(direct).with("black");
        }

    }
   

}

 

@Service
public class Tut4Sender {
    private Logger logger = LoggerFactory.getLogger(Tut4Sender.class);

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private DirectExchange direct;

    private int index;

    private int count;

    private final String[] keys = {"orange", "black", "green"};

    /**
     * 三次分发给不一样的 key
     */
    public void send() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (++this.index == 3) {
            this.index = 0;
        }
        String key = keys[this.index];
        builder.append(key).append(' ');
        builder.append(Integer.toString(++this.count));
        String message = builder.toString();
        template.convertAndSend(direct.getName(), key, message);
        logger.info(" [x] Sent '" + message + "'");
    }
}
@Component
public class Tut3Receiver {
    private Logger logger = LoggerFactory.getLogger(Tut3Receiver.class);
 

    @RabbitListener(queues="#{autoDeleteQueue3.name}")
    public void receive3(String in ) throws InterruptedException{
        receive(in,3);
    }

    @RabbitListener(queues="#{autoDeleteQueue4.name}")
    public void receive4(String in ) throws InterruptedException{
        receive(in,4);
    }


    public void receive(String in, int receiver) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        logger.info("instance " + receiver + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        logger.info("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

 

3、主题

@Configuration
public class RabbitMqExchangeConfig {

    
    /**
     * 主题交换器
     * @return
     */
    @Bean
    public TopicExchange topic() {
        return new TopicExchange("tut.topic");
    }

    private static class TopicConfig{
        @Bean
        public Queue autoDeleteQueue5() {
            return new AnonymousQueue();
        }

        @Bean
        public Queue autoDeleteQueue6() {
            return new AnonymousQueue();
        }

        //              *.orange.* ->queue5
        // tut.topic -> *.*.rabbit->queue5
        //              lazy.#    ->queue6

        //星号匹配一个单词,哈希号匹配多个单词
        @Bean
        public Binding binding3a(TopicExchange topic,Queue autoDeleteQueue5){
            return BindingBuilder.bind(autoDeleteQueue5).to(topic).with("*.orange.*");
        }

        @Bean
        public Binding binding3b(TopicExchange topic,Queue autoDeleteQueue5){
            return BindingBuilder.bind(autoDeleteQueue5).to(topic).with("*.*.rabbit");
        }

        @Bean
        public Binding binding4a(TopicExchange topic,Queue autoDeleteQueue6){
            return BindingBuilder.bind(autoDeleteQueue6).to(topic).with("lazy.#");
        }


    }
 


}
@Service
public class Tut5Sender {
    private Logger logger = LoggerFactory.getLogger(Tut5Sender.class);
    @Autowired
    private RabbitTemplate template;

    @Autowired
    private TopicExchange topic;

    private int index;

    private int count;

    private final String[] keys = {"quick.orange.rabbit",
            "lazy.orange.elephant", "quick.orange.fox",
            "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};

    public void send() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (++this.index == keys.length) {
            this.index = 0;
        }
        String key = keys[this.index];
        builder.append(key).append(' ');
        builder.append(Integer.toString(++this.count));
        String message = builder.toString();
        template.convertAndSend(topic.getName(), key, message);
        logger.info(" [x] Sent '" + message + "'");
    }

}

 

@Component
public class Tut3Receiver {
    private Logger logger = LoggerFactory.getLogger(Tut3Receiver.class);

    @RabbitListener(queues="#{autoDeleteQueue5.name}")
    public void receive5(String in ) throws InterruptedException{
        receive(in,5);
    }

    @RabbitListener(queues="#{autoDeleteQueue6.name}")
    public void receive6(String in ) throws InterruptedException{
        receive(in,6);
    }



    public void receive(String in, int receiver) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        logger.info("instance " + receiver + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        logger.info("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

 

 

参考:rabbit tutorialsruby

          spring amqp reference并发