pom文件都是相同的html
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.bjsxt</groupId> <artifactId>spring-boot-direct-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-direct-consumer</name> <description>spring-boot-direct-consumer</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Direct 交换器(发布与订阅 彻底匹配)java
需求web
server.port=8081 spring.rabbitmq.host=192.168.181.133 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=111111 #设置交换器的名称 mq.config.exchange=log.direct #设置info队列名称 mq.config.queue.info=log.info #设置info的路由键 mq.config.queue.info.routing.key=log.info.routing.key #设置error队列名称 mq.config.queue.error=log.error #设置error的路由键 mq.config.queue.error.routing.key=log.error.routing.key
server.port=8080 spring.rabbitmq.host=192.168.181.133 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=111111 #设置交换器的名称 mq.config.exchange=log.direct #设置info的路由键 mq.config.queue.info.routing.key=log.info.routing.key #设置error队列名称 mq.config.queue.error=log.error #设置error的路由键 mq.config.queue.error.routing.key=log.error.routing.key
package com.bjsxt.receive; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /*** * 消息的接收者 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT), key="${mq.config.queue.info.routing.key}" ) ) public class InfoReceiver { /** * 接收消息的方法 * 采用消息队列监听机制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("info--receiver=:"+msg); } }
package com.bjsxt.receive; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /*** * 消息的接收者 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT), key="${mq.config.queue.error.routing.key}" ) ) public class ErrorReceiver { /** * 接收消息的方法 * 采用消息队列监听机制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("Error--receiver=:"+msg); } }
Sender
package com.bjsxt.send; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * 消息发送者 */ @Component public class Sender { @Autowired private AmqpTemplate amqpTemplate; @Value("${mq.config.exchange}") private String exchange; @Value("${mq.config.queue.error.routing.key}") private String routingkey; /** * 发送消息的方法 * @param msg */ public void sendMsg(String msg){ /*向消息队列发送消息*/ /* * 参数一:队列的名称 * 参数二:发送的消息 * */ amqpTemplate.convertAndSend(exchange,routingkey,msg); } }
package com.bjsxt.test; import com.bjsxt.SpringBootDirectProviderApplication; import com.bjsxt.send.Sender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = SpringBootDirectProviderApplication.class) public class RabbitMQTest { @Autowired private Sender sender; @Test public void queueTest() throws InterruptedException { while (true){ Thread.sleep(1000); sender.sendMsg("你好RabbitMQ"); } } }
server.port=8083 spring.rabbitmq.host=192.168.181.133 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=111111 #设置交换器的名称 mq.config.exchange=log.topic #设置info队列名称 mq.config.queue.info=log.info #设置info队列名称 mq.config.queue.error=log.error #log 队列名称 mq.config.queue.logs=log.all
server.port=8084 spring.rabbitmq.host=192.168.181.133 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=111111 #设置交换器的名称 mq.config.exchange=log.topic
package com.bjsxt.send; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * 消息发送者 */ @Component public class UserSender { @Autowired private AmqpTemplate amqpTemplate; @Value("${mq.config.exchange}") private String exchange; /** * 发送消息的方法 * @param msg */ public void sendMsg(String msg){ /*向消息队列发送消息*/ /* * 参数一:队列的名称 * 参数二:发送的消息 * */ amqpTemplate.convertAndSend(exchange,"user.log.bug", "user.log.debug....."+msg); amqpTemplate.convertAndSend(exchange,"user.log.info", "user.log.info....."+msg); amqpTemplate.convertAndSend(exchange,"user.log.warn", "user.log.warn....."+msg); amqpTemplate.convertAndSend(exchange,"user.log.error", "user.log.error....."+msg); } }
package com.bjsxt.receive; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /*** * 消息的接收者 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC), key="*.log.info" ) ) public class InfoReceiver { /** * 接收消息的方法 * 采用消息队列监听机制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("info--receiver=:"+msg); } }
package com.bjsxt.receive; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /*** * 消息的接收者 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC), key="*.log.*" ) ) public class LogReceiver { /** * 接收消息的方法 * 采用消息队列监听机制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("log--receiver=:"+msg); } }
spring.application.name=springcloud-mq spring.rabbitmq.host=192.168.181.133 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=111111 #设置交换器的名称 mq.config.exchange=order.fanout #短信服务队列名称 mq.config.queue.sms=order.sms #push 服务队列名称 mq.config.queue.push=order.push
spring.application.name=springcloud-mq spring.rabbitmq.host=192.168.181.133 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=111111 #设置交换器的名称 mq.config.exchange=order.fanout
rabbitAmqpTemplate.convertAndSend(this.exchange,"", msg);