Exchange : 接收消息, 并根据路由键转发消息所绑定的队列
Direct Exchange : 所有发送到Direct Exchange的消息被转发到RoutingKey中指定的Queue
注意 : Direct模式可以使用RabbitMQ自带的Exchange(default Exchange), 所以不需要将Exchange进行任何绑定(binding)操作, 消息传递时, RoutingKey必须完全匹配才会被队列接收, 否则该消息会被抛弃
package com.qiyexue.exchange.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Direct模式的生产者 * * @author 七夜雪 * @create 2018-12-13 22:00 */ public class ProducerByDirect { public static void main(String[] args) throws IOException, TimeoutException { // 1. 创建连接工厂, 设置属性 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.72.138"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 获取连接 Connection connection = factory.newConnection(); // 3. 创建channel Channel channel = connection.createChannel(); // 4. 声明 String exchangeName = "test_direct_exchange"; // Direct模式必须和消费者保持一致才能发送消息, 不然消息会被丢弃 String routingKey = "test.direct"; // 5. 发送消息 String msg = "Hello RabbitMQ By Direct"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); // 6. 关闭连接 channel.close(); connection.close(); } }
package com.qiyexue.exchange.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Direct模式消费者 * * @author 七夜雪 * @create 2018-12-13 22:01 */ public class ConsumerByDirect { public static void main(String[] args) throws Exception { // 1. 创建工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.72.138"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 获取连接 Connection connection = factory.newConnection(); // 3. 创建channel Channel channel = connection.createChannel(); // 4. 声明 // 交换机名称 String exchangeName = "test_direct_exchange"; // 交换机类型 String exchangeType = "direct"; String queueName = "test_direct_queue"; // Direct模式RoutingKey必须和生产者保持一致才能消费 String routingKey = "test.direct"; // 表示声明了一个交换机, 后面几个参数分别为durable, autoDelete, internal, arguments channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 声明一个队列, 后面四个参数分别为durable, exclusive, autoDelete, arguments // durable : 是否持久化消息 channel.queueDeclare(queueName, false, false, false, null); // 建立一个绑定关系 channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer consumer = new QueueingConsumer(channel); // 参数 : 队列名称, autoAck:是否自动确认, consumer channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息 : " + msg); } } }
所有发送到Topic Exchange的消息将被转发到所有关心RoutingKey中指定Topic的Queue上
Exchange将RoutingKey和某个Topic进行模糊匹配, 此时队列需要绑定一个Topic
- 可以使用通配符进行模糊匹配
- “#” : 匹配一个或多个词
- “*” : 匹配一个词
package com.qiyexue.exchange.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Topic模式生产者 * * @author 七夜雪 * @create 2018-12-14 8:07 */ public class ProducerByTopic { public static void main(String[] args) throws IOException, TimeoutException { // 创建工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.72.138"); factory.setPort(5672); factory.setVirtualHost("/"); // 创建连接 Connection connection = factory.newConnection(); // 创建channel Channel channel = connection.createChannel(); // 声明 String exchangeName = "test_topic_exchange"; String routingKey1 = "tingxuelou.biluo"; String routingKey2 = "tingxuelou.hongchen"; String routingKey3 = "tingxuelou.hufa.zimo"; String msg = "test topic By routingKey : "; channel.basicPublish(exchangeName, routingKey1, null, (msg + routingKey1).getBytes()); channel.basicPublish(exchangeName, routingKey2, null, (msg + routingKey2).getBytes()); channel.basicPublish(exchangeName, routingKey3, null, (msg + routingKey3).getBytes()); // 关闭连接 channel.close(); connection.close(); } }
package com.qiyexue.exchange.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * topic模式消费者 * * @author 七夜雪 * @create 2018-12-14 20:10 */ public class ConsumerByTopic { public static void main(String[] args) throws Exception { // 创建工厂 // 创建工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.72.138"); factory.setPort(5672); factory.setVirtualHost("/"); // 创建连接 Connection connection = factory.newConnection(); // 创建channel Channel channel = connection.createChannel(); // 声明Exchange String exchangeName = "test_topic_exchange"; String exchangetype = "topic"; // tingxuelou.# String routingKey = "tingxuelou.*"; channel.exchangeDeclare(exchangeName, exchangetype); // 声明队列 String queueName = "test_topic_queue"; channel.queueDeclare(queueName, false, false, false, null); // 绑定队列 channel.queueBind(queueName, exchangeName, routingKey); // 创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); System.out.println(new String(delivery.getBody())); } } }
不处理路由键, 只需要简单的将队列绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
Fanout交换机转发消息是最快的
package com.qiyexue.exchange.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * fanout模式生产者 * * @author 七夜雪 * @create 2018-12-14 20:36 */ public class ProducerByFanout { public static void main(String[] args) throws Exception { // 1. 创建连接工厂, 设置属性 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.72.138"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 获取连接 Connection connection = factory.newConnection(); // 3. 创建channel Channel channel = connection.createChannel(); String exchangeName = "test_fanout_exchange"; String routingKey = "无所谓"; for (int i = 0; i < 5; i++) { String msg = "Fanout 模式消息.."; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); } // 关闭连接 channel.close(); connection.close(); } }
package com.qiyexue.exchange.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * Fanout模式消费者 * * @author 七夜雪 * @create 2018-12-14 20:40 */ public class ConsumerByFanout { public static void main(String[] args) throws Exception { // 1. 创建连接工厂, 设置属性 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.72.138"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 获取连接 Connection connection = factory.newConnection(); // 3. 创建channel Channel channel = connection.createChannel(); // 4. 声明Exchange String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; channel.exchangeDeclare(exchangeName, exchangeType); // 5. 声明消息队列 String routingKey = ""; String queueName = "test_fanout_queue"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); // 6. 创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息 : " + msg); } } }
消息队列, 实际存储消息数据
Durability : 是否持久化
Durable : 是
Transient : 否
Auto delete : 如选yes,代表当最后一个监听被移除之后, 该Queue会自动被删除
package com.qiyexue.message; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; /** * 生产者 * * @author 七夜雪 * @create 2018-12-13 20:43 */ public class Producer { public static void main(String[] args) throws Exception { // 1. 创建连接工厂, 设置属性 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.72.138"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 创建连接 Connection connection = factory.newConnection(); // 3. 使用connection创建channel Channel channel = connection.createChannel(); // 4. 通过channel发送消息 String msg = "hello rabbitmq!"; AMQP.BasicProperties properties = new AMQP.BasicProperties(); Map<String,Object> headers = new HashMap<String, Object>(); headers.put("name", "七夜雪"); properties = properties.builder() // 设置编码为UTF8 .contentEncoding("UTF-8") // 设置自定义Header .headers(headers) // 设置消息失效时间 .expiration("5000").build(); for (int i = 0; i < 5; i++) { // 不指定exchange的情况下, 使用默认的exchange, routingKey与队列名相等 channel.basicPublish("", "test01", properties, msg.getBytes()); } // 5. 关闭连接 channel.close(); connection.close(); } }
package com.qiyexue.message; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; /** * 消费者 * * @author 七夜雪 * @create 2018-12-13 20:57 */ public class Consumer { public static void main(String[] args) throws Exception { // 1. 创建连接工厂, 设置属性 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.72.138"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 创建连接 Connection connection = factory.newConnection(); // 3. 使用connection创建channel Channel channel = connection.createChannel(); // 4. 声明(创建)一个队列 String queueName = "test01"; channel.queueDeclare(queueName,true, false, false, null); // 5. 创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 6. 设置channel channel.basicConsume(queueName, true, consumer); while (true) { // 7. 获取消息 Delivery delivery = consumer.nextDelivery(); System.out.println(new String(delivery.getBody())); // 获取head中内容 System.out.println(delivery.getProperties().getHeaders().get("name")); } } }