AMQP(Advanced Message Queuing Protocol, 高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样的开发语言等条件的限制。html
RabbitMq与spring boot整合简单了解java
引入springboot amqp包spring
<!-- amqp--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml数组
spring: rabbitmq: username: admin password: 123456 host: 192.168.56.128 port: 5672 listener: simple: concurrency: 5 #消费端最小并发数 max-concurrency: 10 #消费端最大并发数 prefetch: 5 #一次请求中预处理的消息数量 cache: channel: size: 50 # 缓存的channel数量
/** * Create by Administrator on 2018/10/12 * Exchange三种模式配置 * @author admin */ @Configuration public class RabbitMqExchangeConfig { /** * 广播交换器 * @return */ @Bean public FanoutExchange fanout(){ return new FanoutExchange("tut.fanout"); } private static class FanoutConfig { //AnonymousQueue类型的队列,它的名字是由客户端生成的,并且是非持久的,独占的,自动删除的队列 @Bean public Queue autoDeleteQueue1() { return new AnonymousQueue(); } @Bean public Queue autoDeleteQueue2() { return new AnonymousQueue(); } //队列和交换机绑定 //这种关系能够读做:这个队列对这个交换器里的消息感兴趣。 //虽然 Queue类型有多个实例,但spring会自动更加名字匹配,bean名字匹配参数名字 @Bean public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(fanout); } @Bean public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2).to(fanout); } } }
@Service public class Tut3Sender { private Logger logger = LoggerFactory.getLogger(Tut3Sender.class); @Autowired private RabbitTemplate template; @Autowired private FanoutExchange fanout; int dots = 0; int count = 0; public void send(){ StringBuilder builder = new StringBuilder("Hello"); if(dots++ == 3){ dots = 1; } for(int i =0;i<dots;i++){ builder.append("."); } builder.append(Integer.toString(++count)); String message = builder.toString(); //向交换机发送信息 template.convertAndSend(fanout.getName(),"", message); logger.info(" [x] Sent '" + message + "'"); } }
@RabbitListener 监听队列状况,属性queues为队列名数组缓存
@Component public class Tut3Receiver { private Logger logger = LoggerFactory.getLogger(Tut3Receiver.class); @RabbitListener(queues="#{autoDeleteQueue1.name}") public void receive1(String in ) throws InterruptedException{ receive(in,1); } @RabbitListener(queues="#{autoDeleteQueue2.name}") public void receive2(String in ) throws InterruptedException{ receive(in,2); } public void receive(String in, int receiver) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); logger.info("instance " + receiver + " [x] Received '" + in + "'"); doWork(in); watch.stop(); logger.info("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
写一个Controller进行接口测试springboot
@Controller public class RabbitMqController { //发布 @Autowired private Tut3Sender tut3Sender; @RequestMapping("/sendFanout") @ResponseBody private String sendFanout(){ for(int i =0;i<10;i++) { tut3Sender.send(); } return "ok"; } }
@Configuration public class RabbitMqExchangeConfig { /** * 广播交换器 * @return */ @Bean public FanoutExchange fanout(){ return new FanoutExchange("tut.fanout"); } /** * @Bean 经过使用静态类封闭 */ private static class DirectConfig{ @Bean public Queue autoDeleteQueue3() { return new AnonymousQueue(); } @Bean public Queue autoDeleteQueue4() { return new AnonymousQueue(); } // orange ->queue3 // tut.direct -> black ->queue3,queue4 // green ->queue4 // @Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue3) { return BindingBuilder.bind(autoDeleteQueue3).to(direct).with("orange"); } @Bean public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue3) { return BindingBuilder.bind(autoDeleteQueue3).to(direct).with("black"); } @Bean public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue4) { return BindingBuilder.bind(autoDeleteQueue4).to(direct).with("green"); } @Bean public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue4) { return BindingBuilder.bind(autoDeleteQueue4).to(direct).with("black"); } } }
@Service public class Tut4Sender { private Logger logger = LoggerFactory.getLogger(Tut4Sender.class); @Autowired private RabbitTemplate template; @Autowired private DirectExchange direct; private int index; private int count; private final String[] keys = {"orange", "black", "green"}; /** * 三次分发给不一样的 key */ public void send() { StringBuilder builder = new StringBuilder("Hello to "); if (++this.index == 3) { this.index = 0; } String key = keys[this.index]; builder.append(key).append(' '); builder.append(Integer.toString(++this.count)); String message = builder.toString(); template.convertAndSend(direct.getName(), key, message); logger.info(" [x] Sent '" + message + "'"); } }
@Component public class Tut3Receiver { private Logger logger = LoggerFactory.getLogger(Tut3Receiver.class); @RabbitListener(queues="#{autoDeleteQueue3.name}") public void receive3(String in ) throws InterruptedException{ receive(in,3); } @RabbitListener(queues="#{autoDeleteQueue4.name}") public void receive4(String in ) throws InterruptedException{ receive(in,4); } public void receive(String in, int receiver) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); logger.info("instance " + receiver + " [x] Received '" + in + "'"); doWork(in); watch.stop(); logger.info("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
@Configuration public class RabbitMqExchangeConfig { /** * 主题交换器 * @return */ @Bean public TopicExchange topic() { return new TopicExchange("tut.topic"); } private static class TopicConfig{ @Bean public Queue autoDeleteQueue5() { return new AnonymousQueue(); } @Bean public Queue autoDeleteQueue6() { return new AnonymousQueue(); } // *.orange.* ->queue5 // tut.topic -> *.*.rabbit->queue5 // lazy.# ->queue6 //星号匹配一个单词,哈希号匹配多个单词 @Bean public Binding binding3a(TopicExchange topic,Queue autoDeleteQueue5){ return BindingBuilder.bind(autoDeleteQueue5).to(topic).with("*.orange.*"); } @Bean public Binding binding3b(TopicExchange topic,Queue autoDeleteQueue5){ return BindingBuilder.bind(autoDeleteQueue5).to(topic).with("*.*.rabbit"); } @Bean public Binding binding4a(TopicExchange topic,Queue autoDeleteQueue6){ return BindingBuilder.bind(autoDeleteQueue6).to(topic).with("lazy.#"); } } }
@Service public class Tut5Sender { private Logger logger = LoggerFactory.getLogger(Tut5Sender.class); @Autowired private RabbitTemplate template; @Autowired private TopicExchange topic; private int index; private int count; private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"}; public void send() { StringBuilder builder = new StringBuilder("Hello to "); if (++this.index == keys.length) { this.index = 0; } String key = keys[this.index]; builder.append(key).append(' '); builder.append(Integer.toString(++this.count)); String message = builder.toString(); template.convertAndSend(topic.getName(), key, message); logger.info(" [x] Sent '" + message + "'"); } }
@Component public class Tut3Receiver { private Logger logger = LoggerFactory.getLogger(Tut3Receiver.class); @RabbitListener(queues="#{autoDeleteQueue5.name}") public void receive5(String in ) throws InterruptedException{ receive(in,5); } @RabbitListener(queues="#{autoDeleteQueue6.name}") public void receive6(String in ) throws InterruptedException{ receive(in,6); } public void receive(String in, int receiver) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); logger.info("instance " + receiver + " [x] Received '" + in + "'"); doWork(in); watch.stop(); logger.info("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
参考:rabbit tutorialsruby