RabbitMQ是使用erlang语言开发的开源消息队列系统,完整的实现了AMPQ(高级抽象层消息通讯协议)。php
使用Homebrew安装git
$ brew install rabbitmq
修改 ~/.bash_profile 配置环境变量:github
# RabbitMQ Config export PATH=$PATH:/usr/local/sbin
重启配置json
$ source ~/.bash_profile
启动mq服务(后台启动为rabbitmq-server -detached)bash
$ rabbitmq-server
登陆管理界面 http://127.0.0.1:15672 帐号密码为:guestcomposer
RabbitMQ官方提供了三种PHP可用的扩展:php-amqp,php-rabbit,php-amqplibtcp
php的客户端如今经常使用的是php-amqplib函数
$ git clone https://github.com/php-amqplib/php-amqplib.git
将composer.json文件添加到您的项目中ui
{spa
“require”:{ “php-amqplib / php-amqplib”:“> = 2.6.1” }
}
下载依赖
$ composer install
virtual vhosts是一个命名空间,能够存在多个exchange和queue。实现了环境(用户,用户组,exchange,queue)隔离,是权限控制的最小粒度。默认的virtual host为/。
接受producer发送的消息,并根据binding绑定规则转发到对应的队列。默认是无名交换使用空字符串标识。exchange type(交换机类型)包含四种类型:direct,topic,headers,fanout
转发消息到routigKey指定的队列
相似于direct类型,只不过routigKey为一个句点号“.”分隔的字符串
*能够替代一个字。
#能够替换零个或多个单词。
根据发送的消息内容中的headers属性进行匹配。
将全部收到的消息广播到全部已知的队列。
queue是mq内部对象,用于存储未被customer消费的消息。相同属性的queue能够重复定义,每一个消息都会被投入到一个或多个队列。
binding是将exchange和queue按照路由规则绑定起来。能够理解为binding是exchange和queue之间的关系
消息tcp链接
每一个connection里,可创建多个channel,每一个channel表明一个会话任务。作到尽可能共用connection
1.send.php:
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立链接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立channel,多个channel能够共用链接 $channel = $connection->channel(); // 建立交换机以及队列(若是已经存在,不须要从新再次建立而且绑定) // 建立直连的交换机 $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 建立队列 $channel->queue_declare('hello', false, false, false, false); // 交换机跟队列的绑定, $channel->queue_bind('hello', 'direct_logs', 'routigKey'); // 设置消息bady传送字符串logs(消息只能为字符串,建议消息均json格式) $msg = new AMQPMessage('logs'); // 发送数据到对应的交换机direct_logs并设置对应的routigKey $channel->basic_publish($msg, 'direct_logs', 'routigKey');
2.receive.php:
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立链接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立channel,多个channel能够共用链接 $channel = $connection->channel(); // 可能会在数据发布以前启动消费者,因此咱们要确保队列存在,而后再尝试从中消费消息。 // 建立直连的交换机 $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 建立队列 $channel->queue_declare('hello', false, false, false, false); // 交换机跟队列的绑定, $channel->queue_bind('hello', 'direct_logs', 'routigKey'); // 回调函数 $callback = function ($msg) { echo $msg->body; }; // 启动队列消费者 $channel->basic_consume('hello3', '', false, true, false, false, $callback); // 判断是否存在回调函数 while(count($channel->callbacks)) { // 此处为执行回调函数 $channel->wait(); }
非持久化会致使,队列重启,数据丢失
exchange持久化,在声明durable参数时指定为true
queue持久化,在声明durable参数时指定true
消息持久化,实例化AMQPMessage类时指定delivery_mode为2
exchange和queue是否持久化须要一致才能绑定
消费者设置手动ack,在声明no_ack参数时指定false
队列消息异常须要将消息删除并再次发送一样的消息置于末尾并手动记录日志