RabbitMQ是一个消息代理。它的核心原理很是简单:接收和发送消息。你能够把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。通常提到RabbitMQ和消息,都用到一些专有名词。php
咱们的“Hello world”不会很复杂——仅仅发送一个消息,而后获取它并输出到屏幕。这样以来咱们须要两个程序,一个用做发送消息,另外一个接受消息并打印消息内容html
咱们大致的设计是这样的:python
生产者(Producer)把消息发送到一个名为“hello”的队列中。消费者(consumer)从这个队列中获取消息。web
RabbitMQ库
RabbitMQ使用的是AMQP协议。要使用她你就必须须要一个使用一样协议的库。几乎全部的编程语言都有可选择的库。python也是同样,能够从如下几个库中选择:编程
- py-amqplib
- txAMQP
- pika
在这一系列教程中,咱们打算使用PHP 的AMQP扩展。详细教程请查看:服务器
mac os 下RabbitMq 以及 PHP amqp扩展安装记录
咱们第一个程序send.php会发送一个消息到队列中。首先要作的事情就是创建一个到RabbitMQ服务器的链接。网络
$connection = new AMQPConnection(array('host' =>'127.0.0.1', 'port' =>'5672', 'vhost' =>'/', 'login' =>'guest', 'password' => 'guest'));
如今咱们已经链接上服务器了,那么,在发送消息以前咱们须要确认队列是存在的。若是咱们把消息发送到一个不存在的队列,RabbitMQ会丢弃这条消息。我门先建立一个名为hello的队列,而后把消息发送到这个队列中。编程语言
$queue = new AMQPQueue($channel); $queue->setName($queueName);
这时候咱们就能够发送消息了,咱们第一条消息只包含了 Hello World!字符串,咱们打算把它发送到咱们的hello队列。函数
在RabbitMQ中,消息是不能直接发送到队列,它须要发送到交换器(exchange)中。咱们不打算在这里深刻讨论它——你能够经过教程的第三部分了解更多。如今咱们所须要了解的是如何使用默认的交换器(exchange),它使用一个空字符串来标识。交换器容许咱们指定某条消息须要投递到哪一个队列,$$routeKey参数必须指定为队列的名称:工具
$exchange->publish($message, $routeKey); var_dump("[x] Sent 'Hello World!'");
在退出程序以前,咱们须要确认网络缓冲已经被刷写、消息已经投递到RabbitMQ。完成这些事情(正确的关闭链接)是很简单的。
$connection->disconnect();
咱们的第二个程序receive.php,将会从队列中获取消息并打印消息。
此次咱们仍是先要链接到RabbitMQ服务器。链接服务器的代码和以前是同样的。
下一步也和以前同样,咱们须要确认队列是存在的。使用$queue->declare()建立一个队列——咱们能够运行这个命令不少次,可是只有一个队列会建立。
$queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->declare();
你也许要问为何重复声明了队列——咱们已经在前面的代码中声明了它。若是咱们肯定了队列是已经存在的,那么咱们能够不这么作。好比先运行send.php程序。但是咱们并不肯定哪一个程序先运行,这种状况的话再程序中重复声明是好的作法。
列出全部队列
你也许但愿查看RabbitMQ由哪些队列、有多少消息在队列中。你可使用rabbitmqctl工具(使用有权限的用户):
``` $ sudo rabbitmqctl list_queues Listing queues ... hello 0 ...done.
```
(omit sudo on Windows)
(在Windows中不须要sudo命令)
从队列中获取消息相对来讲稍显复杂。须要为队列定义一个回调(callback)函数。当咱们获取到消息的时候,Pika库就会调用这个回调(callback)函数。咱们的这个回调函数将会但因消息的内容到屏幕上。
function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); $queue->nack($envelope->getDeliveryTag()); }
下一步,咱们须要告诉RabbitMQ这个回调函数将会从hello队列中接收消息:
$queue->consume('callback');
要成功运行这些命令,咱们必须保证队列是存在的,咱们已经可以保证——咱们以前已经使用建立了一个队列queue_declare。
$queue->nack()//函数稍后会介绍。
最后,咱们输入一个无限循环来等待消息数据并确运行回调函数。
var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); }
send.php的所有代码:
<?php /** * PHP amqp(RabbitMQ) Demo-1 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'hello'; $routeKey = 'hello'; $message = 'Hello World!'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); try { $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $queue = new AMQPQueue($channel); $queue->setName($queueName); $exchange->publish($message, $routeKey); var_dump("[x] Sent 'Hello World!'"); } catch (AMQPConnectionException $e) { var_dump($e); exit(); } $connection->disconnect();
receive.py的所有代码:
<?php /** * PHP amqp(RabbitMQ) Demo-1 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'hello'; $routeKey = 'hello'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->declare(); $queue->bind($exchangeName, $routeKey); var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); } $connection->disconnect(); function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); $queue->nack($envelope->getDeliveryTag()); }
如今就能够在终端中运行咱们的程序了。首先,用send.php重续发送一条消息:
php send.php string(23) "[x] Sent 'Hello World!'"</pre>
生产者(producer)程序send.php每次运行以后就会中止。如今咱们就来接收消息:
php receive.php string(46) "[*] Waiting for messages. To exit press CTRL+C" string(26) " [x] Received:Hello World!"</pre>
成功了!咱们已经经过RabbitMQ发送第一条消息。你也许已经注意到了,receive.py程序并无退出。它一直在准备获取消息,你能够经过Ctrl-C来终端它。
试下在新的终端中再次运行send.php。
咱们已经学会如何发送消息到一个已知队列中并接收消息。是时候移步到第二部分了,咱们将会创建一个简单的工做队列(work queue)。