RabbitMQ入门教程

摘要: 使用RabbitMQ的消息队列,能够有效提升系统的峰值处理能力。javascript

clipboard.png

RabbitMQ简介

RabbitMQ消息代理(Message Broker),它支持多种异步消息处理方式,最多见的有:html

  • Work Queue:将消息缓存到一个队列,默认状况下,多个worker按照Round Robin的方式处理队列中的消息。每一个消息只会分配给单个worker。
  • Publish/Subscribe:每一个订阅消息的消费者都会收到消息,所以每一个消息一般会分配给多个worker,每一个worker对消息进行不一样的处理。

RabbitMQ还支持RoutingTopics、以及Remote procedure calls (RPC)等方式。java

对于不一样的消息处理方式,有一点是相同的,RabbitMQ是介于消息的生产者和消费者的中间节点,负责缓存和分发消息。RabbitMQ接收来自生产者的消息,缓存到内存中,按照不一样的方式分发给消费者。RabbitMQ还能够将消息写入磁盘,保证持久化,这样即便RabbitMQ意外崩溃了,消息数据不至于彻底丢失。node

为何使用RabbitMQ?

最简单的一点在于,它支持Work Queue等不一样的消息处理方式,能够用于不一样的业务场景。对于咱们Fundebug来讲,目前只用过RabbitMQ的Work Queue,即消息队列。git

使用消息队列,能够将不算紧急、可是很是消耗资源的计算任务,以消息的方式插入到RabbitMQ的队列中,而后使用多个处理模块处理这些消息。github

这样作最大的好处在于:提升了系统峰值处理能力。由于,来不及处理的消息缓存在RabbitMQ中,避免了同时进行大量计算致使系统因超负荷运行而崩溃。而那些来不及处理的消息,会在峰值过去以后慢慢处理掉。docker

另外一个好处在于解耦。消息的生产者只须要将消息发送给RabbitMQ,这些消息何时处理完,不会影响生产者的响应性能。api

广告:欢迎免费试用Fundebug,为您监控线上代码的BUG,提升用户体验~promise

安装并运行RabbitMQ

使用Docker运行RabbitMQ很是简单,只须要执行一条简单的命令:缓存

sudo docker run -d --name rabbitmq -h rabbitmq -p 5672:5672 -v /var/lib/rabbitmq:/var/lib/rabbitmq registry.docker-cn.com/library/rabbitmq:3.7

对于不熟悉Docker的朋友,我解释一下docker的命令选项:

  • -d : 后台运行容器
  • --name rabbitmq : 将容器的名字设为rabbitmq
  • -h rabbitmq : 将容器的主机名设为rabbitmq,但愿RabbitMQ消息数据持久化保存到本地磁盘是须要设置主机名,由于RabbitMQ保存数据的目录为主机名
  • -p 5672:5672 : 将容器的5672端口映射为本地主机的5672端口,这样能够经过本地的5672端口访问rabbitmq
  • -v /var/lib/rabbitmq:/var/lib/rabbitmq:将容器的/var/lib/rabbitmq目录映射为本地主机的/var/lib/rabbitmq目录,这样能够将RabbitMQ消息数据持久化保存到本地磁盘,即便RabbitMQ容器被删除,数据依然还在。

Docker为官方镜像提供了加速服务,所以命令中Rabbit的Docker镜像名为registry.docker-cn.com/library/rabbitmq:3.7

若是你不会Docker,建议你学习一下。若是你不想学,Ubuntu 14.04下安装RabbitMQ的命令是这样的:

sudo echo "deb http://www.rabbitmq.com/debian testing main" | sudo tee -a /etc/apt/sources.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server

启动RabbitMQ:

sudo service rabbitmq-server start

消息队列代码示例

下面,咱们使用Node.js实现一个简单消息队列。

clipboard.png

消息的生产者:sender.js

const amqp = require("amqplib");

const queue = "demo";

async function sendMessage(message)
{
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();
    await channel.assertQueue(queue);
    await channel.sendToQueue(queue, new Buffer(message),
    {
        // RabbitMQ关闭时,消息会被保存到磁盘
        persistent: true
    });
}


setInterval(function()
{
    sendMessage("Hello, Fundebug!");
}, 1000)
  • 在sender中,不断地往消息队列中发送"Hello, Fundebug!"。

消息的消费者:receiver.js

const amqp = require("amqplib");

const queue = "demo";

async function receiveMessage()
{
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();
    await channel.assertQueue(queue);
    await channel.consume(queue, function(message)
    {
        console.log(message.content.toString());
        channel.ack(message);
    });
}

receiveMessage();
  • 在receiver中,从消息队列中读出message并打印。

咱们用到了amqplib模块,用于与RabbitMQ进行通讯,对于具体接口的细节,能够查看文档

在调用sendToQueue时,将persistent属性设为true,这样RabbitMQ关闭时,消息会被保存到磁盘。测试这一点很简单:

  • 关闭receiver
  • 启动sender,发送消息给RabbitMQ
  • 重启RabbitMQ(sudo docker restart rabbitmq)
  • 启动receiver,会发现它能够接收sender在RabbitMQ重启以前发送的消息

因为RabbitMQ容器将保存数据的目录(/var/lib/rabbitmq)以数据卷的形式保存在本地主机,所以即便将RabbitMQ容器删除(sudo docker rm -f rabbitmq)后从新运行,效果也是同样的。

另外,这段代码采用了Node.js最新的异步代码编写方式:Async/Await,所以很是简洁,感兴趣的同窗能够了解一下。

这个Demo的运行方式很是简单:

  • 运行RabbitMQ容器
sudo ./start_rabbitmq.sh
  • 发送消息
node ./sender.js
  • 接收消息
node ./receiver.js

在receiver端,能够看到不停地打印"Hello, Fundebug!"。

代码仓库地址为:Fundebug/rabbitmq-demo

自动重连代码示例

在生产环境中,RabbitMQ不免会出现重启的状况,好比更换磁盘或者服务器、负载太高致使崩溃。由于RabbitMQ能够将消息写入磁盘,因此数据是"安全"的。可是,代码中必须实现自动重连机制,不然RabbitMQ中止时会致使Node.js应用崩溃。这里提供一个自动重连的代码示例,给你们参考:

消息生产者:sender_reconnect.js

const amqp = require("amqplib");

const queue = "demo";

var connection;

// 链接RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.sendToQueue(queue, new Buffer("Hello, Fundebug!"),
        {
            // RabbitMQ重启时,消息会被保存到磁盘
            persistent: true
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();

消息消费者:receiver_reconnect.js

const amqp = require("amqplib");

const queue = "demo";

var connection;

// 链接RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.consume(queue, async function(message)
        {
            console.log(message.content.toString());
            channel.ack(message);
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();

这样的话,即便RabbitMQ重启,sender和receiver也能够自动从新链接RabbitMQ。若是你但愿监控RabbitMQ是否出错,不妨使用咱们Fundebug的Node.js错误监控服务,在链接触发"error"或者"close"事件时,第一时间发送报警,这样开发者能够及时定位和处理BUG。

参考

相关文章
相关标签/搜索