首先添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
添加配置java
在application.properties中添加如下配置spring
#rabbitmq spring.rabbitmq.host=118.24.103.51 spring.rabbitmq.port=5672 #服务器帐号密码 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #消费者数量 spring.rabbitmq.listener.simple.concurrency= 10 spring.rabbitmq.listener.simple.max-concurrency= 10 #\u6D88\u8D39\u8005\u6BCF\u6B21\u4ECE\u961F\u5217\u83B7\u53D6\u7684\u6D88\u606F\u6570\u91CF spring.rabbitmq.listener.simple.prefetch= 1 #\u6D88\u8D39\u8005\u81EA\u52A8\u542F\u52A8 spring.rabbitmq.listener.simple.auto-startup=true #\u6D88\u8D39\u5931\u8D25\uFF0C\u81EA\u52A8\u91CD\u65B0\u5165\u961F spring.rabbitmq.listener.simple.default-requeue-rejected= true #\u542F\u7528\u53D1\u9001\u91CD\u8BD5 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000 spring.rabbitmq.template.retry.multiplier=1.0
演示一 :direct模式交换机(exchange)模式
建立消息队列服务器
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { public static final String QUEUE_NAME = "queue"; @Bean public Queue queue(){ return new Queue(QUEUE_NAME,true);//第一个参数是队列名 第二是是否持久化 } }
消息发送者 app
import com.hz1202.miaosha.service.RedisService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MQSender { @Autowired private AmqpTemplate amqpTemplate; public void send(Object message){ String msg = RedisService.beanToString(message); amqpTemplate.convertAndSend(MQConfig.QUEUE_NAME,msg); System.out.println("send message:"+msg); } }
消息接收者spring-boot
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class MQReceiver { @RabbitListener(queues = MQConfig.QUEUE_NAME) public void receiver(String message){ System.out.println("receiveMessage:"+message); } }
注意:用户guest是rabbitMQ的默认用户 密码为guest 可是 guest不支持远程登陆,要让guest支持远程登陆 请在rabbitMQ 安装目录下你的 /etc/rabbitmq文件夹中的rabbitmq.config(没有的话本身建立)配置文件中加入 如下代码oop
[{rabbit, [{loopback_users, []}]}].
演示二 :Topic模式交换机(exchange)模式
建立消息队列fetch
/** * topic 模式 */ @Bean public Queue topicQueue1(){ return new Queue("topic.queue1",true);//第一个参数是队列名 第二是是否持久化 } @Bean public Queue topicQueue2(){ return new Queue("topic.queue2",true);//第一个参数是队列名 第二是是否持久化 } @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding topicBinding(){ return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1"); } @Bean public Binding topicBinding2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#"); }
流程说明:咱们先建立了两个queue 分别命名为 topic.queue1 和 topic.queue2 , 而后再建立一个交换机 命名为 topicExchange ,最后将两个queue和交换机绑定,同时制定了匹配规则 ,"#"表明所有匹配ui
消息发送者 spa
import com.hz1202.miaosha.service.RedisService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MQSender { @Autowired private AmqpTemplate amqpTemplate; public void sendTopic(Object message){ String msg = RedisService.beanToString(message); amqpTemplate.convertAndSend("topicExchange","topic.key1",msg+"1");//第一个参数表明交换机名 第二个表明知足匹配规则的表达式 第三个消息 amqpTemplate.convertAndSend("topicExchange","topic.key2",msg+"2"); System.out.println("send message:"+msg); } }
咱们在绑定交换机与queue时制定了匹配规则,"topic.key1"只能匹配"topic.key1","topic.#"能够匹配所有以"topic."开头的消息; 这样,第一条消息就会被 topic.queue1和 topic.queue2所匹配,而第二条只能被 topic.queue2匹配到code
消息接收者
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1) public void receiverTopic1(String message){ System.out.println("receive topic queue1 message:"+message); } @RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2) public void receiverTopic2(String message){ System.out.println("receive topic queue2 message:"+message); }
演示三 :Fanout模式交换机(exchange)模式
建立队列并将队列和fanout交换机绑定
/** * fanout模式 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding fanoutBinding1(){ return BindingBuilder.bind(topicQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2(){ return BindingBuilder.bind(topicQueue2()).to(fanoutExchange()); }
建立消息发送者
public void sendFanout(Object message){ String msg = RedisService.beanToString(message); amqpTemplate.convertAndSend("fanoutExchange","",msg);//第一个参数表明交换机名 第三个消息 System.out.println("send fanout message:"+msg); }
消息接收者
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1) public void receiverTopic1(String message){ System.out.println("receive queue1 message:"+message); } @RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2) public void receiverTopic2(String message){ System.out.println("receive queue2 message:"+message); }