<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
server: port: 8888 # 设置端口号 spring: rabbitmq: host: 127.0.0.1 # 设置RabbitMQ所在主机 port: 5672 # 设置RabbitMQ服务端口 username: guest # 设置RabbitMQ用户名 password: guest # 设置RabbitMQ密码
public interface RabbitConstant { /** * 简单模式 */ String SIMPLE_QUEUE_NAME = "simple_queue"; /** * work 模式 */ String WORK_QUEUE_NAME = "work_queue"; /** * 发布/订阅(publish/subscribe)模式 */ String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange"; String PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME = "publish_subscribe_first_queue"; String PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME = "publish_subscribe_second_queue"; /** * 路由(routing)模式 */ String ROUTING_EXCHANGE_NAME = "routing_exchange"; String ROUTING_FIRST_QUEUE_NAME = "routing_first_queue"; String ROUTING_SECOND_QUEUE_NAME = "routing_second_queue"; String ROUTING_THIRD_QUEUE_NAME = "routing_third_queue"; String ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME = "routing_first_queue_routing_key"; String ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME = "routing_second_queue_routing_key"; String ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME = "routing_third_queue_routing_key"; /** * 主题(topics)模式 */ String TOPICS_EXCHANGE_NAME = "topics_exchange"; String TOPICS_FIRST_QUEUE_NAME = "topics_first_queue"; String TOPICS_SECOND_QUEUE_NAME = "topics_second_queue"; String TOPICS_THIRD_QUEUE_NAME = "topics_third_queue"; String TOPICS_FIRST_QUEUE_ROUTING_KEY = "topics.first.routing.key"; String TOPICS_SECOND_QUEUE_ROUTING_KEY = "topics.second.routing.key"; String TOPICS_THIRD_QUEUE_ROUTING_KEY = "topics.third.routing.key"; String TOPICS_ROUTING_KEY_FIRST_WILDCARD = "#.first.#"; String TOPICS_ROUTING_KEY_SECOND_WILDCARD = "*.second.#"; String TOPICS_ROUTING_KEY_THRID_WILDCARD = "*.third.*"; /** * header 模式 */ String HEADER_EXCHANGE_NAME = "header_exchange"; String HEADER_FIRST_QUEUE_NAME = "header_first_queue"; String HEADER_SECOND_QUEUE_NAME = "header_second_queue"; /** * rpc 模式 */ String RPC_QUEUE_NAME = "rpc_queue"; }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.nio.charset.StandardCharsets; @RestController public class RabbitController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value = "/simple") public void simple() { rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!"); } @GetMapping(value = "/work") public void work() { rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!"); } @GetMapping(value = "/pubsub") public void pubsub() { rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello"); } @GetMapping(value = "/routing") public void routing() { // 给第一个队列发送消息 rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello"); } @GetMapping(value = "/topics") public void topics() { // 给第一个队列发送消息,此时队列能接受到消息,由于队列通配符为 #.first.#,而 routing_key 为 topics.first.routing.key,匹配成功 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello"); // 给第二个队列发送消息,此时队列也能接受到消息,由于队列通配符为 *.second.#,而 routing_key 为 topics.second.routing.key,匹配成功 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello"); // 给第三个队列发送消息,此时队列没法接受到消息,由于队列通配符为 *.third.*,而 routing_key 为 topics.third.routing.key,匹配失败 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello"); } @GetMapping(value = "/header") public void header() { // 这条消息应该能被两个队列都接收到,第一个队列 all 匹配成功,第二个队列 hello-value any匹配成功 MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("matchAll", "YES"); messageProperties.setHeader("hello", "world"); Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message); // 这条消息应该只被第二个队列接受,第一个队列 all 匹配失败,第二个队列 matchAll-NO any匹配成功 MessageProperties messagePropertiesSecond = new MessageProperties(); messagePropertiesSecond.setHeader("matchAll", "NO"); Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond); rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond); } @GetMapping(value = "/rpc") public void rpc() { Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!"); System.out.println("rabbit rpc response message: " + responseMsg); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitSimpleProvider { @Bean public Queue simpleQueue() { return new Queue(RabbitConstant.SIMPLE_QUEUE_NAME); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitSimpleConsumer { @RabbitHandler @RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_NAME) public void simpleListener(String context) { System.out.println("rabbit receiver: " + context); } }
@Test public void simple() { rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitWorkProvider { @Bean public Queue workQueue() { return new Queue(RabbitConstant.WORK_QUEUE_NAME); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitWorkConsumer { @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME) @RabbitHandler public void workQueueListenerFirst(String context) { System.out.println("rabbit workQueue listener first receiver: " + context); } @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME) @RabbitHandler public void workQueueListenerSecond(String context) { System.out.println("rabbit workQueue listener second receiver: " + context); } }
@Test public void work() { rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitPublishSubscribeProvider { @Bean public Queue pubsubQueueFirst() { return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME); } @Bean public Queue pubsubQueueSecond() { return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME); } @Bean public FanoutExchange fanoutExchange() { // 建立fanout类型交换机,表示与此交换机会将消息发送给全部绑定的队列 return new FanoutExchange(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME); } @Bean public Binding pubsubQueueFirstBindFanoutExchange() { // 队列一绑定交换机 return BindingBuilder.bind(pubsubQueueFirst()).to(fanoutExchange()); } @Bean public Binding pubsubQueueSecondBindFanoutExchange() { // 队列二绑定交换机 return BindingBuilder.bind(pubsubQueueSecond()).to(fanoutExchange()); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitPublishSubscribeConsumer { @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME) @RabbitHandler public void pubsubQueueFirst(String context) { System.out.println("rabbit pubsub queue first receiver: " + context); } @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME) @RabbitHandler public void pubsubQueueSecond(String context) { System.out.println("rabbit pubsub queue second receiver: " + context); } }
@Test public void pubsub() { rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitRoutingProvider { @Bean public Queue rabbitRoutingFirstQueue() { return new Queue(RabbitConstant.ROUTING_FIRST_QUEUE_NAME); } @Bean public Queue rabbitRoutingSecondQueue() { return new Queue(RabbitConstant.ROUTING_SECOND_QUEUE_NAME); } @Bean public Queue rabbitRoutingThirdQueue() { return new Queue(RabbitConstant.ROUTING_THIRD_QUEUE_NAME); } @Bean public DirectExchange directExchange() { // 建立direct类型交换机,表示与此交换机会将消息发送给 routing_key 彻底相同的队列 return new DirectExchange(RabbitConstant.ROUTING_EXCHANGE_NAME); } @Bean public Binding routingFirstQueueBindDirectExchange() { // 队列一绑定direct交换机,并设置 routing_key 为 routing_first_queue_routing_key return BindingBuilder.bind(rabbitRoutingFirstQueue()).to(directExchange()).with(RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME); } @Bean public Binding routingSecondQueueBindDirectExchange() { // 队列二绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key return BindingBuilder.bind(rabbitRoutingSecondQueue()).to(directExchange()).with(RabbitConstant.ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME); } @Bean public Binding routingThirdQueueBindDirectExchange() { // 队列三绑定direct交换机,并设置 routing_key 为 routing_third_queue_routing_key return BindingBuilder.bind(rabbitRoutingThirdQueue()).to(directExchange()).with(RabbitConstant.ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitRoutingConsumer { @RabbitListener(queues = RabbitConstant.ROUTING_FIRST_QUEUE_NAME) @RabbitHandler public void routingFirstQueueListener(String context) { System.out.println("rabbit routing queue first receiver: " + context); } @RabbitListener(queues = RabbitConstant.ROUTING_SECOND_QUEUE_NAME) @RabbitHandler public void routingSecondQueueListener(String context) { System.out.println("rabbit pubsub queue second receiver: " + context); } @RabbitListener(queues = RabbitConstant.ROUTING_THIRD_QUEUE_NAME) @RabbitHandler public void routingThirdQueueListener(String context) { System.out.println("rabbit pubsub queue third receiver: " + context); } }
@Test public void routing() { // 给第一个队列发送消息 rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTopicProvider { @Bean public Queue topicFirstQueue() { return new Queue(RabbitConstant.TOPICS_FIRST_QUEUE_NAME); } @Bean public Queue topicSecondQueue() { return new Queue(RabbitConstant.TOPICS_SECOND_QUEUE_NAME); } @Bean public Queue topicThirdQueue() { return new Queue(RabbitConstant.TOPICS_THIRD_QUEUE_NAME); } @Bean public TopicExchange topicExchange() { // 建立topic类型交换机,表示与此交换机会将消息发送给 routing_key 通配符匹配成功的队列 return new TopicExchange(RabbitConstant.TOPICS_EXCHANGE_NAME); } @Bean public Binding topicFirstQueueBindExchange() { // 队列一绑定topic类型交换机,并设置 routing_key 通配符为 #.first.# return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_FIRST_WILDCARD); } @Bean public Binding topicSecondQueueBindExchange() { // 队列二绑定topic类型交换机,并设置 routing_key 通配符为 *.second.# return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_SECOND_WILDCARD); } @Bean public Binding topicThirdQueueBindExchange() { // 队列三绑定topic类型交换机,并设置 routing_key 通配符为 *.third.* return BindingBuilder.bind(topicThirdQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_THRID_WILDCARD); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitTopicsConsumer { @RabbitListener(queues = RabbitConstant.TOPICS_FIRST_QUEUE_NAME) @RabbitHandler public void topicFirstQueue(String context) { System.out.println("rabbit topics queue first receiver: " + context); } @RabbitListener(queues = RabbitConstant.TOPICS_SECOND_QUEUE_NAME) @RabbitHandler public void topicSecondQueue(String context) { System.out.println("rabbit topics queue second receiver: " + context); } @RabbitListener(queues = RabbitConstant.TOPICS_THIRD_QUEUE_NAME) @RabbitHandler public void topicThirdQueue(String context) { System.out.println("rabbit topics queue third receiver: " + context); } }
@Test public void topics() { // 给第一个队列发送消息,此时队列能接受到消息,由于队列通配符为 #.first.#,而 routing_key 为 topics.first.routing.key,匹配成功 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello"); // 给第二个队列发送消息,此时队列也能接受到消息,由于队列通配符为 *.second.#,而 routing_key 为 topics.second.routing.key,匹配成功 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello"); // 给第三个队列发送消息,此时队列没法接受到消息,由于队列通配符为 *.third.*,而 routing_key 为 topics.third.routing.key,匹配失败 rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello"); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitHeaderProvider { @Bean public Queue headerFirstQueue() { return new Queue(RabbitConstant.HEADER_FIRST_QUEUE_NAME); } @Bean public Queue headerSecondQueue() { return new Queue(RabbitConstant.HEADER_SECOND_QUEUE_NAME); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange(RabbitConstant.HEADER_EXCHANGE_NAME); } @Bean public Binding headerFirstQueueBindExchange() { Map<String, Object> headersMap = new HashMap<>(8); headersMap.put("matchAll", "YES"); headersMap.put("hello", "world"); return BindingBuilder.bind(headerFirstQueue()).to(headersExchange()).whereAll(headersMap).match(); } @Bean public Binding headerSecondQueueBindExchange() { Map<String, Object> headersMap = new HashMap<>(8); headersMap.put("matchAll", "NO"); headersMap.put("hello", "world"); return BindingBuilder.bind(headerSecondQueue()).to(headersExchange()).whereAny(headersMap).match(); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitHeaderConsumer { @RabbitListener(queues = RabbitConstant.HEADER_FIRST_QUEUE_NAME) @RabbitHandler public void headerFirstQueue(String context) { System.out.println("rabbit header queue first receiver: " + context); } @RabbitListener(queues = RabbitConstant.HEADER_SECOND_QUEUE_NAME) @RabbitHandler public void headerSecondQueue(String context) { System.out.println("rabbit header queue second receiver: " + context); } }
@Test public void header() { // 这条消息应该能被两个队列都接收到,第一个队列 all 匹配成功,第二个队列 hello-value any匹配成功 MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("matchAll", "YES"); messageProperties.setHeader("hello", "world"); Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message); // 这条消息应该只被第二个队列接受,第一个队列 all 匹配失败,第二个队列 matchAll-NO any匹配成功 MessageProperties messagePropertiesSecond = new MessageProperties(); messagePropertiesSecond.setHeader("matchAll", "NO"); Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond); rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond); }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitRpcProvider { @Bean public Queue rpcQueue() { return new Queue(RabbitConstant.RPC_QUEUE_NAME); } }
import com.example.rabbitmq.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitRpcConsumer { @RabbitListener(queues = RabbitConstant.RPC_QUEUE_NAME) @RabbitHandler public String rpcQueue(String context) { System.out.println("rabbit rpc queue receiver: " + context); return "copy that!"; } }
@Test public void rpc() { Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!"); System.out.println("rabbit rpc response message: " + responseMsg); }
文章还有不少不足之处,欢迎各位兄弟姐妹批评指正,代码仓库已存放至gitee [SpringBoot RabbitMQ 工做模式仓库连接](https://gitee.com/BarryMan/spring-boot-rabbitmq.git)
java