SpringBoot Rabbitmq发送消息

官方文档:https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-amqphtml

引入依赖:spring

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

发送消息代码:json

@RestController @RequestMapping("/") public class SenderMsgController { @Autowired private AmqpTemplate amqpTemplate; @RequestMapping(value = "/{str}") public void testSend(@RequestParam("str") String str) throws InterruptedException { for (int i = 0; i < 10; i++) { int millis = 500; Thread.sleep(new Long(millis)); if (i%2 == 1) { String isr = "{\n" + "\t\"dd\":" + i + "}"; amqpTemplate.convertAndSend("DirectExchange","test.1",isr,new MyMessageConverter()); }else{ String isr = "{\n" + "\t\"dd\":" + i + "}"; amqpTemplate.convertAndSend("DirectExchange","test.2",isr,new MyMessageConverter()); } System.out.println("第"+i+"次发送"); } } }
MyMessageConverter:
//MessagePostProcessor 接口能够对发送请求以前的Message 进行操做,这里我设置了contenttype为json格式
public class MyMessageConverter implements MessagePostProcessor { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); return message; } }

建立交换机、队列并互相绑定设置路由keyapp

@Component public class AmqpAdminConfig { @Autowired public AmqpAdmin amqpAdmin; @Bean public DirectExchange createDirectExchange(){ DirectExchange directExchange = new DirectExchange("DirectExchange", false, false); amqpAdmin.declareExchange(directExchange); return directExchange; } // @Bean // public void createFanoutExchange(){ // amqpAdmin.declareExchange(new FanoutExchange("FanoutExchange",false,false)); // }
 @Bean public Queue createQueue1(){ Queue queue = new Queue("queue-1", false, false, false); amqpAdmin.declareQueue(queue); return queue; } @Bean public Queue createQueue2(){ Queue queue = new Queue("queue-2", false, false, false); amqpAdmin.declareQueue(queue); return queue; } @Bean public void createBinding1(){ Binding bind = BindingBuilder.bind(createQueue1()).to(createDirectExchange()).with("test.1"); amqpAdmin.declareBinding(bind); } @Bean public void createBinding2(){ Binding bind = BindingBuilder.bind(createQueue2()).to(createDirectExchange()).with("test.2"); amqpAdmin.declareBinding(bind); } }

根据官方文档知道AmqpTemplate 和AmqpAdmin  已经自动配置,可直接注入使用,AmqpTemplate 封装了发送与接收的各类操做,AmqpAdmin  封装了针对交换机和消息队列的各类操做ide

相关文章
相关标签/搜索