在前一篇文章中可伸缩架构简短系列中提到过关于异步的问题。当时推荐使用RabbitMQ来作任务队列的实现方案。本篇文章以Node.js为例子,来实际操做如何和RabbitMQ进行交互。javascript
RabbitMQ是一个消息代理。它最初的思想特别简单:接受而且转发消息。你能够将它想象为邮局:当你将邮件放到信箱中,你能够很是确定快件员最终会将邮件交到接受人手中。你能够把RabbitMQ比喻为信箱、邮局和快递员。RabbitMQ和邮局之间主要的区别是它不处理纸张,而是接受、存储和转发二进制数据‒消息。html
在RabbitMQ中,有一些基本术语:前端
打开RabbitMQ的下载页面(https://www.rabbitmq.com/download.html),下载安装,这里以Mac OSX平台安装为例:
RabbitMQ依赖Erlang,因为Mac OSX自己已经安装了Erlang,因此能够直接经过Homebrew来进行安装。java
$ brew update $ brew install rabbitmq
安装完后,须要将/usr/local/sbin添加到$PATH,添加到./.bash_profile文件,而后node
$ source ./.bash_profile $ echo $PATH // 检查环境变量是否已经成功加入
安装完成后就能够启动RabbitMQ server了。
git
至此,安装就完成了。运行rabbitmq-server命令时可能会报错误:ERROR: epmd error for host localhost: timeout (timed out),若是遇到这种状况,能够打开/etc/hosts文件,在文件末尾加上 127.0.0.1 localhost便可解决问题。github
在这个部分,我会使用Javascript编写两个小程序。一个发送单个消息的生产者和接收消息并将其打印出来的消费者。咱们将跳过在amqp.node API的一些细节,集中在这个很是简单的事情。npm
在下图中,P表明生产者,C表明消费者,中间红色表明的是任务队列-消息缓冲区
小程序
首先,使用npm安装amqp.node前端工程化
$ npm install amqplib
这里我将消息的发送者称做send.js,消息接受者称做receive.js,消息发送者会链接到RabbitMQ,发送一个消息,最后退出。
首先引入amqplib这个模块:
var amqp = require('amqplib/callback_api');
链接到 RabbitMQ server
amqp.connect('amqp://localhost', function(err, conn) {});
接下来建立一个通道,
amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) {}); });
为了发送消息,咱们须要定义一个队列,咱们能够将消息发送到这个队列中:
amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'hello'; ch.assertQueue(q, {durable: false}); // Note: on Node 6 Buffer.from(msg) should be used ch.sendToQueue(q, new Buffer('Hello World!')); console.log(" [x] Sent 'Hello World!'"); }); });
最后,咱们关闭链接,而且退出:
setTimeout(function() { conn.close(); process.exit(0) }, 500);
最终代码参考:send.js
创建一个接受者的方式和发送者是相同的。打开一个链接和通道,而且申明一个须要处理的队列。注意:这里的队列和发送者里面定义的队列须要匹配。
amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'hello'; ch.assertQueue(q, {durable: false}); }); });
这里也定义队列的缘由:接受者可能比发送者先开始执行。咱们须要确保当接受者处理queue的时候,queue是存在的。
因为消息的发送是异步的,咱们须要提供一个回调,这样,当RabbitMQ发送消息给咱们的消费者时,回调会执行。这个也是Channel.consume作的事情。
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); ch.consume(q, function(msg) { console.log(" [x] Received %s", msg.content.toString()); }, {noAck: true});
最终代码参考:receive.js
// 先执行send.js $ ./send.js // 后执行receive.js $ ./receive.js
最终结果如图: