应用范围为服务访问量忽然剧增,缘由可能有多种外部的调用或内部的一些问题致使消息积压,对服务的访问超过服务所能处理的最大峰值,致使系统超时负载从而崩溃。node
举一些咱们日常生活中的消费场景,例如:火车票、机票、门票等,一般来讲这些服务在下单以后,后续的出票结果都是异步通知的,若是服务自己只支持每秒1000访问量,因为外部服务的缘由忽然访问量增长到每秒2000并发,这个时候服务接收者由于流量的剧增,超过了本身系统自己所能处理的最大峰值,若是没有对消息作限流措施,系统在这段时间内就会形成不可用,在生产环境这是一个很严重
的问题,实际应用场景不止于这些,本文经过RabbitMQ来说解若是对消费端作限流措施。git
RabbitMQ提供了服务质量保证 (QOS
) 功能,对channel(通道)预先设置必定的消息数目,每次发送的消息条数都是基于预先设置的数目,若是消费端一旦有未确认的消息,这时服务端将不会再发送新的消费消息,直到消费端将消息进行彻底确认,注意:此时消费端不能设置自动签收,不然会无效。github
在 RabbitMQ v3.3.0
以后,放宽了限制,除了对channel设置以外,还能够对每一个消费者进行设置。并发
如下为 Node.js 开发语言 amqplib
库对于限流实现提供的接口方法 prefetch
异步
export interface Channel extends events.EventEmitter {
prefetch(count: number, global?: boolean): Promise<Replies.Empty>;
...
}
复制代码
prefetch 参数说明:async
生产端没什么变化,和正常声明同样,关于源码参见rabbitmq-prefetch(Node.js客户端版Demo)测试
const amqp = require('amqplib');
async function producer() {
// 1. 建立连接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道
const channel = await connection.createChannel();
// 3. 声明参数
const exchangeName = 'qosEx';
const routingKey = 'qos.test001';
const msg = 'Producer:';
// 4. 声明交换机
await channel.assertExchange(exchangeName, 'topic', { durable: true });
for (let i=0; i<5; i++) {
// 5. 发送消息
await channel.publish(exchangeName, routingKey, Buffer.from(`${msg} 第${i}条消息`));
}
await channel.close();
}
producer();
复制代码
const amqp = require('amqplib');
async function consumer() {
// 1. 建立连接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道
const channel = await connection.createChannel();
// 3. 声明参数
const exchangeName = 'qosEx';
const queueName = 'qosQueue';
const routingKey = 'qos.#';
// 4. 声明交换机、对列进行绑定
await channel.assertExchange(exchangeName, 'topic', { durable: true });
await channel.assertQueue(queueName);
await channel.bindQueue(queueName, exchangeName, routingKey);
// 5. 限流参数设置
await channel.prefetch(1, false);
// 6. 限流,noAck参数必须设置为false
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
// channel.ack(msg);
}, { noAck: false });
}
consumer();
复制代码
在 consumer 中咱们暂且将 channel.ack(msg)
注释掉,分别启动生产者和消费者,看看是什么状况?fetch
如上图所示,总共5条消息按照预先设置的发送了一条消息,由于我将 channel.ack(msg)
注释掉了,服务端在未获得 ack 确认,将不会在发送剩下已 Ready 消息。ui
修改 consumer 代码,打开确认消息注释,从新启动消费端进行测试spa
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
channel.ack(msg); // 打开注释
}, { noAck: false });
复制代码
如上图所示,Unacked 为0,消息已所有消费成功。
限流在咱们的实际工做中仍是颇有意义的,在使用上生产端没有变化,重点在消费端,着重看如下两点:
{ noAck: false }
channel.prefetch(1, false)
做者:五月君
连接:www.imooc.com/article/287… 来源:慕课网