RabbitMQ是一个消息代理器:它接受和转发消息。你能够把它看成一个邮局:当你把邮件放在信箱里时,你能够确定邮差先生最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ就是这里的邮箱,邮局和邮差。php
RabbitMQ和邮局之间的主要区别是,它不处理纸张,而是接受、存储和转发二进制数据‒消息。html
RabbitMQ,和通常的消息传递,使用专业术语。json
生产者的工做就是发送消息。发送消息的程序是生产者:segmentfault
队列类比一个邮箱,存在于RabbitMQ, 然而信息流经过RabbitMQ和您的应用程序,他们只能存储在一个队列。队列只受主机内存和磁盘限制的约束,它本质上是一个很大的消息缓冲区。会有许多生产者能够发送到一个队列的消息,许多消费者能够尝试从一个队列接收数据。这就是咱们如何表示队列的方式:数组
消费者和生产者有着类似的意义. 消费者无非就是等待消息而后处理的程序:服务器
请注意,生产者、消费者和代理没必要同一主机上;事实上,在大多数应用程序中它们没有这样作。composer
(使用PHP amqplib客户端)异步
在本教程的这一部分中,咱们将用PHP编写两个程序;一个生产者发送一条消息,一个用户接收消息并将它们打印出来。咱们会PHP amqplib API的忽略一些细节,集中在这个很是简单的事情刚刚开始。这是一个“Hello World”的消息传递。socket
在下图中,“p”是咱们的生产商,“C”是咱们的消费者。在中间的框是一个队列的消息缓冲区,RabbitMQ保持表明的消费。ide
RabbitMQ有不少协议。本教程介绍AMQP 0-9-1,这是一个开放的、通用的协议消息。有许多不一样的语言RabbitMQ一批客户。咱们将在本教程中使用PHP amqplib,composer解决依赖管理。
添加composer.json:
{ "require": { "php-amqplib/php-amqplib": ">=2.6.1" } }
composer install # 或者 直接运行包引入 composer require php-amqplib/php-amqplib
生产者(消息发送方)
咱们命令咱们的消息发布者(发送者)send.php和消息接收receive.php。发送者将链接到RabbitMQ,发送一条消息,而后退出。
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;
如今咱们能建立一个链接服务器的Connection:
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();
该链接抽象套接字(socket)链接,并为咱们负责协议版本协商和认证等。这里,咱们链接到一个rabbitmq代理器在本地机器上-使用localhost。若是咱们想在不一样的机器上链接到一个代理,咱们只需在这里指定它的名称或IP地址。
接下来,咱们建立一个通道,这是处理事情的大部分API的地方。
发送消息前,咱们必须声明一个队列为咱们发送作准备;而后咱们能够向队列发布消息:
$channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n";
声明队列是幂等的(原句:Declaring a queue is idempotent,这里的idempotent不知道是什么意思) - 只有在它不存在时才会建立队列。消息内容是一个字节数组,所以您能够在那里编码用你喜欢的方式。
最后,咱们关闭通道和链接;
$channel->close(); $connection->close();
上面咱们完成了send.php.
接下来咱们完成消费方的代码
消费者从RabbitMQ接收推来的消息,咱们会保持运行监听消息并打印出来。
引入lib
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection;
设置与发布程序相同;咱们打开一个链接和一个通道,并声明将要消耗的队列。注意,这与发送发布的队列匹配。
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
注意,咱们也在这里声明队列。由于咱们可能在发布以前启动消费者,咱们但愿在咱们尝试从它那里消费消息以前肯定队列的存在。
咱们将告诉服务器从队列中发送消息。咱们将定义一个PHP可调用,它将接收服务器发送的消息。请记住,消息是从服务器异步发送到客户机的。
$callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
当调用basic_consume,咱们的代码会阻塞。当咱们收到消息时,咱们的回调函数将经过接收到返回的消息传递。
以上是咱们receive.php的代码
php receive.php
php send.php
rabbitmqctl list_queues
<?php return [ 'vendor' => [ 'path' => dirname(dirname(__DIR__)) . '/vendor' ], 'rabbitmq' => [ 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'qkl', 'password' => '123456', 'vhost' => '/' ] ]; ?>
<?php $config = require "../config.php"; require_once $config['vendor']['path'] . '/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection($config['rabbitmq']['host'], $config['rabbitmq']['port'], $config['rabbitmq']['login'], $config['rabbitmq']['password'], $config['rabbitmq']['vhost']); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
<?php $config = require "../config.php"; require_once $config['vendor']['path'] . '/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection($config['rabbitmq']['host'], $config['rabbitmq']['port'], $config['rabbitmq']['login'], $config['rabbitmq']['password'], $config['rabbitmq']['vhost']); $channel = $connection->channel(); //发送方其实不须要设置队列, 不过对于持久化有关,建议执行该行 $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close(); ?>
了解如何构建一个简单的工做队列, 你能够阅读下一章节: RabbitMQ+PHP 教程二(Work Queues)