AMQP消息路由必须包含三部分,交换器、队列、绑定。以下图所示,生产者把消息发送给交换器,交换器再路由到符合条件的队列上,最终被消费者接收。绑定决定了消息如何从路由器路由到相应的队列。这一篇,主要是了解一下队列。web
当新增队列的时候,须要定义一下4中属性,分布是Name、Durability、Auto delete、Arguments。服务器
amq.
开头命名。定义一个队列的方法以下,exclusive的参数,下面的临时队列会说明。负载均衡
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
咱们在使用队列以前,须要先声明队列。若是队列不存在,则建立队列。若是已存在则不会建立,可是和已存在的队列属性不一致,则会有406 (PRECONDITION_FAILED)的通道级异常。异步
参数的设置有两种,一种是经过分组,一个是一个个队列设置。分组的方式更加灵活、非侵入性,不须要修改和从新部署应用程序,是官方推荐的方式。参数的描述以下:spa
参数 | 描述 |
---|---|
x-message-ttl | 消息的存活时间,单位为毫秒 |
x-expires | 队列的存活时间,单位为毫秒 |
x-max-length | 队列的最大消息数 |
x-max-length-bytes | 消息的最大字节数 |
x-overflow | 消息达到最大数的策略,drop-head或者reject-publish |
x-dead-letter-exchange | 死信队列的交换器 |
x-dead-letter-routing-key | 死信队列的路由键 |
x-max-priority | 消息的优先级设置 |
x-queue-mode | 消息的延迟 |
x-queue-master-locator | 用于主从 |
当咱们须要一个临时队列的时候,咱们能够先定义队列,使用完再删除,或者直接定义Durability的属性为transient,等broker重启的时候就消失,可是感受没有很方便。特别是使用后删除,若是客户端失败,这个队列就一直存在。咱们能够用如下方法来自动删除:code
public static void main(String[] args) throws IOException, TimeoutException { // 声明一个链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 建立一个与rabbitmq服务器的链接 Connection connection = factory.newConnection(); // 建立一个Channel Channel channel = connection.createChannel(); // 经过Channel定义队列 channel.queueDeclare("queue1", false, true, false, null); channel.queueDeclare("queue2", false, false, true, null); channel.basicConsume("queue2", true, null, consumerTag -> { }); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-expires",5000); channel.queueDeclare("queue3", false, false, false, arguments); }
queue1是独占队列,queue2是自动删除,queue3设置了5秒的过时时间。
运行后以下图,五秒后queue3消失,中止程序运行,queue1和queue2消失。
须要注意的是,若是把queue2的basicConsume方法调用注释掉,因为没有消费者,队列并不会消失。
独占队列只能由其声明链接使用(从声明链接使用、清除、删除等)。其余队列若是想使用独占队列将致使通道级异常RESOURCE_LOCKED,该异常带有一条错误消息,代表没法得到对锁定队列的独占访问。rabbitmq
消费者经过两种方式来接收消息:队列
咱们先往队列里发送消息,其中queue1发送4条,queue2发送3条。这边还没讲到如何发送消息,能够经过http://127.0.0.1:15672/
的web控制台来发送消息。
queue1是拉模式,queue2是推模式。
queue1的代码:路由
public static void main(String[] args) throws IOException, TimeoutException { // 声明一个链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 建立一个与rabbitmq服务器的链接 Connection connection = factory.newConnection(); // 建立一个Channel Channel channel = connection.createChannel(); // 经过Channel定义队列 channel.queueDeclare("queue1", false, false, false, null); GetResponse response = channel.basicGet("queue1", true); System.out.println("queue1 Received '" + new String(response.getBody()) + "'"); GetResponse response2 = channel.basicGet("queue1", true); System.out.println("queue1 Received '" + new String(response.getBody()) + "'"); }
运行结果以下,调用get两次。
queue2的代码:部署
public static void main(String[] args) throws IOException, TimeoutException { // 声明一个链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 建立一个与rabbitmq服务器的链接 Connection connection = factory.newConnection(); // 建立一个Channel Channel channel = connection.createChannel(); // 经过Channel定义队列 channel.queueDeclare("queue2", false, false, false, null); // 异步回调处理 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("queue2 Received '" + message + "'"); }; // 接收消息 channel.basicConsume("queue2", true, deliverCallback, consumerTag -> { }); }
调用consume一次。
最终结果以下:queue1由于get两次,因此还有2条消息。queue2的3条消息都消费完。
在上面的例子中,咱们先把消息发送给队列,此时没有消费者,消息就会在队列里一直等。那若是有消费者,且有多个消费者,消息是如何发布的呢?
咱们启动上面的queue2应用程序两次,再发送消息,能够看到,两个应用程序是交替消费数据的。整个步骤以下:
队列的建立究竟是消费者仍是生产者呢?咱们上面的例子都是消费者建立的,可是若是队列还没建立,生产者就开始往不存在的队列发送消息,消息就会丢失。因此为了消息可以正确的到达队列,须要生产者和消费者都要尝试去建立队列,除非消息不那么重要,能够消费者来建立。
队列是AMQP是消息通讯的基础模块: