抢票,商品秒杀
等功能是如何实现的,其实没有多么高大上,看了消息队列就知道了。做者简介:koala,专一完整的 Node.js 技术栈分享,从 JavaScript 到 Node.js,再到后端数据库,祝您成为优秀的高级 Node.js 工程师。【程序员成长指北】做者,Github 博客开源项目 github.com/koala-codin…javascript
“消息队列”是在消息的传输过程当中保存消息的容器。html
我的理解:我把它分红两个词消息
和队列
。当一大批客户端同时产生大量的网络请求(消息
)时候,服务器的承受能力确定是有一个限制的。这时候要是有个容器,先让这些消息排队就行了,还好有个叫队列
的数据结构,经过有队列属性的容器
排队(先进先出),把消息再传到咱们的服务器,压力减少了好多,这个很棒的容器
就是消息队列前端
这段理解中还包含这个两个概念: 客户端->生产者
服务器->消费者
当有消息队列
出现,生产者
和消费者
是必不可少的两个概念,上面的理解是多个生产者
对应一个消费者
,固然现实开发中还有许多消费者
的状况哦。接下来的文章也会屡次提到生产-消费模型
。java
应用解耦node
消息队列可使消费者和生产者直接互不干涉,互不影响,只须要把消息发送到队列便可,并且可独立的扩展或修改两边的处理过程,只要能确保它们遵照一样的接口约定,能够生产者用Node.js实现,消费者用phython实现。git
灵活性和峰值处理能力程序员
当客户端访问量忽然剧增,对服务器的访问已经超过服务所能处理的最大峰值,甚至致使服务器超时负载崩溃,使用消息队列能够解决这个问题,能够经过控制消费者的处理速度
和生产者可进入消息队列的数量
等来避免峰值问题github
排序保证面试
消息队列能够控制数据处理的顺序,由于消息队列自己使用的是队列这个数据结构,FIFO
(先进选出),在一些场景数据处理的顺序很重要,好比商品下单顺序等。redis
异步通讯
消息队列中的有些消息,并不须要当即处理,消息队列提供了异步处理机制,能够把消息放在队列中并不当即处理,须要的时候处理,或者异步慢慢处理,一些不重要的发送短信和邮箱功能可使用。
可扩展性
前面提到了消息队列能够作到解耦
,若是咱们想加强消息入队和出队的处理频率,很简单,并不须要改变代码中任何内容,能够直接对消息队列修改一些配置便可,好比咱们想限制每次发送给消费者的消息条数等。
有优点定有它现实的应用场景,文章后面会针对优点讲它们对应的应用场景。
处理日志
的,能够看作是一个日志(消息)系统
一个重要组件,针对性很强。0.8 版本开始支持复制,不支持事物,所以对消息的重复、丢失、错误没有严格的要求。高并发
的特性,毋庸置疑,RabbitMQ 最高,缘由是它的实现语言是天生具有高并发高可用的erlang 语言,天生的分布式
优点。说明: 本文主要以RabbitMQ讲解,较为常见。 我的认为这几种消息队列中间件能实现的功能,经过 redis 也都能实现,思想。
直接经过 HomeBrew 安装,执行如下命令
brew install rabbitmq
复制代码
进入安装目录
$ /usr/local/Cellar/rabbitmq/3.7.8
启动
$ sbin/rabbitmq-server
复制代码
浏览器输入 http://localhost:15672/#/ 默认用户名密码 guest
可视化界面可模块功能介绍:
其余系统安装请自行网上搜索
5672:通讯默认端口号 15672:管理控制台默认端口号 25672:集群通讯端口号 注意: 阿里云 ECS 服务器若是出现 RabbitMQ 安装成功,外网不能访问是由于安全组的问题没有开放端口 解决方案
如下列举一些在终端经常使用的操做命令
注意:以上终端全部命令,须要进入到rabbitmqctl的sbin目录下执行rabbitmqctl命令才有用,不然会报错:
![]()
画一张基本的图,HelloWorld 消息队列的图片,把下面几个概念都画进去。
看这段代码前先说几个概念
推荐一个 npm 模块amqplib
。
Github: github.com/squaremo/am…
$ npm install amqplib
复制代码
const amqp =require('amqplib');
async function product(params) {
// 1.建立连接对象
const connect =await amqp.connect('amqp://localhost:5672');
// 1. 建立连接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道
const channel = await connection.createChannel();
// 3. 声明参数
const routingKey = 'helloKoalaQueue';
const msg = 'hello koala';
for (let i=0; i<10000; i++) {
// 4. 发送消息
await channel.publish('', routingKey, Buffer.from(`${msg} 第${i}条消息`));
}
// 5. 关闭通道
await channel.close();
// 6. 关闭链接
await connect.close();
}
product();
复制代码
执行 node product.js
复制代码
代码注释中已经把基本的流程讲解了,可是我刚开始看的时候还有疑问,我想不少小伙伴也会有疑问,说明下:
疑问1
前面提到过交换机这个名词,生产者
发消息的时候必需要指定一个 exchange,若不指定 exchange(为空)会默认指向 AMQP default 交换机,AMQP default 路由规则是根据 routingKey 和 mq 上有没有相同名字的队列进行匹配路由。上面这段代码就是默认指定的交换机。不一样类型交换机详细讲解请往下看。
疑问2
生产者发送消息后,消息是发送到交换机exchange,可是这时候会建立队列吗?
答案:代码中咱们声明的是路由是routingKey,可是它并无建立helloKoalaQueue 消息队列,消息只会发送到交exchange交换机。 运行代码后看队列截图能够证实这一点:
说明1
生产者发送消息后,注意关闭通道和链接,只要消息发送成功后,链接就能够关闭了,消费者用任何语言去获取消息均可以,这也证实了消息队列优秀解耦
的特性
说明2
能够屡次执行node product.js
生产者代码,消息会堆积到交换机exchange
中,并不会覆盖,若是已执行过消费者而且确认了对应的消息队列
,消息会从exchange交换机
发送到消息队列
,并存入到消息队列
,等待消费者
消费
// 构建消费者
const amqp = require('amqplib');
async function consumer() {
// 1. 建立连接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道
const channel = await connection.createChannel();
// 3. 声明参数
const queueName = 'helloKoalaQueue';
// 4. 声明队列,交换机默认为 AMQP default
await channel.assertQueue(queueName);
// 5. 消费
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
channel.ack(msg);
});
}
consumer();
复制代码
执行 node consumer.js
复制代码
运行后的执行结果
说明1
这时候我改变代码中的队列名称为helloKoalaQueueHaHa
,这时候去看Rabbitmq可视化界面中,队列模块,建立了这个队列
看到这里再次证实了消息队列优秀的解耦特性
,消费者和生产者模型
之间没有任何联系,再次建立这个helloKoalaQueueHaHa
路由名称的生产者,消费者
也会正常消费,而且会打印消息,你们能够实际操做试一下。
说明2
这时候我改变代码中的队列名称为helloKoalaQueueHaHa
,这时候去看Rabbitmq可视化界面中,队列模块,建立了这个队列
看到这里又再次证实了消息队列优秀的解耦特性
,消费者和生产者模型
之间没有任何联系,再次建立这个helloKoalaQueueHaHa
路由名称的生产者,消费者
也会正常消费,而且会打印消息,你们能够实际操做试一下。
弊端: 这样只能一个队列一个队列的删除,若是队列中的消息过多就会特别慢。
先记住一句话
生产者发消息的时候必须指定一个 exchange,不然消息没法直接到达消息队列,Exchange将消息路由到一个或多个Queue中(或者丢弃)
而后开始本章节交换机的讲解
若不指定 exchange(为空)会默认指向 AMQP default 交换机,AMQP default 路由规则是根据 routingKey 和 mq 上有没有相同名字的队列进行匹配路由。
经常使用的四种类型
fanout
direct
topic
headers
无论是哪种类型的交换机,都有一个绑定binding的操做,只不过根据不一样的交换机类型有不一样的路由绑定策略。不一样类型作的下图红色框框中的事。
fanout类型的Exchange路由规则很是简单,它会把全部发送到该Exchange的消息路由到全部与它绑定的Queue中,不须要设置路由键。
说明:全部消息都会路由到两个Queue中,是两个消费者均可以收到所有的彻底相同的消息吗? 答案是的,两个消费者收到的队列消息正常应该是彻底相同的。这种类型经常使用于广播类型的需求,或者也能够消费者1记录日志 ,消费者2打印日志
对应代码实现:
生产者:
const amqp = require('amqplib');
async function producer() {
// 建立连接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 获取通道
const channel = await connection.createChannel();
// 声明参数
const exchangeName = 'fanout_koala_exchange';
const routingKey = '';
const msg = 'hello koala';
// 交换机
await channel.assertExchange(exchangeName, 'fanout', {
durable: true,
});
// 发送消息
await channel.publish(exchangeName, routingKey, Buffer.from(msg));
// 关闭连接
await channel.close();
await connection.close();
}
producer();
复制代码
消费者:
const amqp = require('amqplib');
async function consumer() {
// 建立连接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 获取通道
const channel = await connection.createChannel();
// 声明参数
const exchangeName = 'fanout_koala_exchange';
const queueName = 'fanout_kaola_queue';
const routingKey = '';
// 声明一个交换机
await channel.assertExchange(exchangeName, 'fanout', { durable: true });
// 声明一个队列
await channel.assertQueue(queueName);
// 绑定关系(队列、交换机、路由键)
await channel.bindQueue(queueName, exchangeName, routingKey);
// 消费
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
channel.ack(msg);
});
console.log('消费端启动成功!');
}
consumer();
复制代码
注意:其余类型代码已经放到 github,地址:github.com/koala-codin… 欢迎 star 交流。
direct 把消息路由到那些 binding key与 routing key 彻底匹配的 Queue中。
生产者指定 RoutingKey 消息根据消费端指定的队列经过模糊匹配的方式进行相应转发,两种通配符模式: #:可匹配一个或多个关键字 *:只能匹配一个关键字
header exchange(头交换机)和主题交换机有点类似,可是不一样于主题交换机的路由是基于路由键,头交换机的路由值基于消息的 header 数据。 主题交换机路由键只有是字符串,而头交换机能够是整型和哈希值 header Exchange 类型用的比较少,能够自行 google 了解。
(本小段内容来源网上,参考文章说明)
(注意,这里只是提一下 RPC 这个知识,由于单单一个RPC一篇文章都不必定说说完,有兴趣的能够用队列尝试一下RPC)
消息队列
是存在内存中的,若是出现问题挂掉,消息队列中的消息会丢失。因此对于一些需求很是有持久化的必要!RabbitMQ 能够开启持久化。不一样开发语言均可以设置持久化参数。
这里以Node.js为例子,其余语言能够自行搜索
await channel.assertExchange(exchangeName, 'direct', { durable: true });
// 注意其中的{ durable: true },这事对交换机持久化,还有其余的几种持久化方式
复制代码
同时推荐一篇不错的写持久化的文章: juejin.im/post/5d6f6b…
消息应答简单的解释就是消费者
完成了消费后,通知一下消息队列。
我以为这个配置是有必要打开的,消费者完成消息队列中的任务,消费者可能中途失败或者挂掉,一旦 RabbitMQ 发送一个消息给消费者而后便迅速将该消息从消息队列内存
中移除,这种状况下,消费者对应工做进程失败或者挂掉后,那该进程正在处理的消息也将丢失。并且,也将丢失全部发送给该进程的未被处理的消息。
为了确保消息永不丢失,RabbitMQ 支持消息应答机制。当消息被接受,处理以后一条应答便会从消费者回传至发送方,而后RabbitMQ将其删除。
若是某个消费者挂掉(信道、连接关闭或者 tcp 连接丢失)且没有发送 ack 应答,RabbitMQ 会认为该消息没有被处理彻底而后会将其从新放置到队列中。经过这种方式你就能够确保消息永不丢失,甚至某个工做进程偶然挂掉的状况。
默认状况下消息应答是关闭的。是时候使用 false(auto-ack配置项)参数将其开启了
这里以 Node.js 为例子,其余语言能够自行搜索
// 消费者消费时候的代码
await channel.consume(queueName, msg => {
console.log('koala:', msg.content.toString());
//... 这里能够放业务逻辑处理的代码,消费者完成后发送回执应答
channel.ack(msg);// 消息应答
}, { noAck: false });
复制代码
能够将prefetch count
项的值配置为1,这将会指示 RabbitMQ 在同一时间不要发送超过一条消息给每一个消费者。换句话说,直到消息被处理和应答以前都不会发送给该消费者任何消息。取而代之的是,它将会发送消息至下一个比较闲的消费者或工做进程。
这里以 Node.js 为例子,amqplib 库对于限流实现提供的接口方法 prefetch。
prefetch 参数说明:
// 建立消费者的时候 限流参数设置
await channel.prefetch(1, false);
复制代码
若是一个生产者,两个消费者,发放消息,我想要的队列先给消费者1发,发完消费者1发消费者2,这样有顺序的交互发送,应该如今哪种交换机呢?注意是交互,看完以后想一下?还有消费者完成后有没有手动回调消息队列完成的必要?消息持久化有必要没,持久化有什么好处?
(看完消息队列的消息传递,你会有疑问管道中的消息(生产者)是怎么被消费者消费的 放入队列,而后从队列被取出)
双十一商品秒杀/抢票功能实现
咱们在双11的时候,当咱们凌晨大量的秒杀和抢购商品,而后去结算的时候,就会发现,界面会提醒咱们,让咱们稍等,以及一些友好的图片文字提醒。而不是像前几年的时代,动不动就页面卡死,报错等来呈现给用户。
用一张图来解释消息队列在秒杀抢票等场景的使用: (说明:往下看以前,若是你作过电商类秒杀,能够想一想你是怎么实现的,咱们能够一块儿讨论哦。这里只是想说下消息队列的做用,并非最终优化的结果,好比用redis控制总缓存等)
消费队列
,这样响应给用户的速度就会快不少;并且还要保证很多卖,用户拿到了订单,不支付怎么办?咱们都知道如今订单都有有效期,再使用一个消息队列
,用于判断订单支付超时,好比说用户五分钟内不支付,订单就失效了,订单一旦失效,就会加入新的库存。这也是如今不少网上零售企业保证商品很多卖采用的方案。订单量比较少的状况下,生成订单很是快,用户几乎不用排队。 积分兑换(积分可用于多平台)
积分兑换模块,有一个公司多个部门都要用到这个模块,这时候就能够经过消息队列解耦这个特性来实现。 各部门系统作各部门的事,可是他们均可以用这个积分系统进行商品的兑换等。其余模块与积分模块彻底解耦。
发送邮件,用户大数据分析等 同步变异步功能实现
这个功能要说的比较多,从一个平台的用户注册开始。
正常状况注册,不出现高并发:
对于用户来讲,他就是想注册用一下这个软件,只要服务端将他的帐户信息存到数据库中他即可以登陆上去作他想作的事情了。用户并不care这些事,服务端就能够把其余的操做放入对应的消息队列
中而后立刻返回用户结果,由消息队列异步
的进行这些操做。
假若有大量的用户注册,发生了高并发:
邮件接口承受不住,或是分析信息时的大量计算使 cpu 满载,这将会出现虽然用户数据记录很快的添加到数据库中了,可是却卡在发邮件或分析信息时的状况,致使请求的响应时间大幅增加,甚至出现超时,这就有点不划算了。面对这种状况通常也是将这些操做放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时能够很快的完成注册请求,不会影响用户使用其余功能。
基于RabbitMQ的Node.js与Phython或其余语言实现通讯
这里也是利用了 RabbitMQ 的解耦特性,不只仅能够与 Phython,还能够与其余不少语言通讯,就不具体说了。
亲,别只看,你试试呀!直接开启服务,装个 RabbitMQ,挺有意思的,就算一个 HelloWorld 也能尝试出不少内容。并且本文说的不少内容均可以用 redis 来实现,也能够去看下个人 redis 文章。顺便说一句设计模式和数据结构是两个好东西,愈来愈能感受到。
www.cnblogs.com/baidawei/p/… www.sojson.com/blog/48.htm… www.zhihu.com/question/34… bbs.csdn.net/topics/3921… www.imooc.com/article/293… mp.weixin.qq.com/s/wTkwJXlNr…
require时,exports和module.exports的区别你真的懂吗