PHP RabbitMQ

安装php

rabbitmq 在 mac 下能够直接用 brew 安装
默认安装在 /usr/local/Cellar/下
命令被软链接加入到了/usr/local/sbin 下,所以能够把此目录放到环境变量中,建议加入到~/.bash_profile 中
rabbitmq-server start 开启服务
端口5672 默认
端口15672 web 端登陆管理端口127.0.0.1:15672
rabbitmq 默认提供的用户 guest 密码 guesthtml

管理

中止服务
rabbitmqctl stop
开启应用 [服务依旧运行]
rabbitmqctl start_app
中止应用 [服务依旧运行]
rabbitmqctl stop_appweb

用户管理

添加用户
sudo rabbitmqctl add_user username password
删除用户
sudo rabbitmqctl delete_user username
修改密码
sudo rabbitmqctl change_password username newpassword
清除用户密码,禁止用户登陆
sudo rabbitmqctl clear_password <username>
列出全部用户
sudo rabbitmqctl list_users
设置用户角色
rabbitmqctl set_user_tags username tagbash

vhost虚拟主机管理

virtual host只是起到一个命名空间的做用,因此能够多个user共同使用一个virtual host,'/'这个是系统默认的vhost,就是说当咱们建立一个到rabbitmq的connection时候,它的命名空间是'/',须要注意的是不一样的命名空间之间的资源是不能访问的,好比 exchang,queue ,bingding等
建立虚拟主机
sudo rabbitmqctl add_vhost vhostpath
删除虚拟主机
sudo rabbitmqctl delete_vhost vhostpath
列出全部虚拟主机
sudo rabbitmqctl list_vhosts
列出某个 vhost 的全部用户和权限
list_permissions [-p vhostpath]
列出某个用户的全部权限。
list_user_permissions {username}
清除用户对某个 vhost 的权限。
clear_permissions [-p vhostpath] {username}
设置用户对某个 virtual host 的权限,若是不指定 vhost,则默认为“/” vhost。
set_permissions [-p vhostpath] {user}
rabbitmqctl set_permissions -p test_host kang “." "." ".*"服务器

添加一个管理员代替 guest
rabbitmqctl add_user admin 123456
指定用户的角色
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*”
分配给用户指定虚拟主机的权限,虽然是administrator角色,但不对全部虚拟主机都有权限,同样须要对每一个虚拟主机都受权app

显示信息
rabbitmqctl list_queues [-p <vhostpath>] [<queueinfoitem> ...]
列出某个 vhost 的全部 queue。
rabbitmqctl list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
列出某个 vhost 的全部 exchange。
rabbitmqctl list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
列出某个 vhost 的全部 binding。
rabbitmqctl list_connections [<connectioninfoitem> ...]
列出 RabbitMQ broker 的全部 connection。
rabbitmqctl list_channels [<channelinfoitem> ...]
列出 RabbitMQ broker 的全部 channel
rabbitmqcrl list_consumers [-p <vhostpath>]
列出某个 vhost 的全部 consumer。composer

基本概念点

1.Server(broker): 接受客户端链接,实现AMQP消息队列和路由功能的进程。函数

2.Virtual Host:实际上是一个虚拟概念,相似于权限控制组,一个Virtual Host里面能够有若干个Exchange和Queue,可是权限控制的最小粒度是Virtual Hostui

3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不一样类型的Exchange路由的行为是不同的。spa

4.Message Queue:消息队列,用于存储还未被消费者消费的消息。

5.Message: 由Header和Body组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、由哪一个Message Queue接受、优先级是多少等。而Body是真正须要传输的APP数据。

6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header获得Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,二者的匹配方式由Exchange Type决定。

7.Connection:链接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP链接。

8.Channel:信道,仅仅建立了客户端到Broker之间的链接后,客户端仍是不能发送消息的。须要为每个Connection建立Channel,AMQP协议规定只有经过Channel才能执行AMQP的命令。一个Connection能够包含多个Channel。之因此须要Channel,是由于TCP链接的创建和释放都是十分昂贵的,若是一个客户端每个线程都须要与Broker交互,若是每个线程都创建一个TCP链接,暂且不考虑TCP链接是否浪费,就算操做系统也没法承受每秒创建如此多的TCP链接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,可是建议尽可能共用Connection。

9.Command:AMQP的命令,客户端经过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端能够经过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。

客户端管理

php端 rabbitmq 客户端可使用 composer 下的库

{
    "require": {
        "php-amqplib/php-amqplib": "2.5.*"
    }
}

使用时,用到了这两个东西

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

开始发送消息

public function handle()
{
    //链接到 test_host 虚拟主机,每一个虚拟主机有本身的队列,交换机...
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'kang', 'a943434603', 'test_host');
    //建立一个 channel
    $channel = $connection->channel();
    //声明 hello 队列
    $channel->queue_declare('hello', false, false, false, false);
    //建立一个消息
    $msg = new AMQPMessage(time());
    //把消息推送到默认的交换机中,而且告诉交换机要把消息交给 hello 队列
    $channel->basic_publish($msg, '', 'hello');
    echo " [x] Sent ".time()."\n";
}

重要概念,消息是保存在交换机中的,当消息存放时指定的队列存在,交换机会把消息推送到该队列

消息队列发送消息给消费者,一个消息发给一个消费者

public function handle()
{
    //链接
    $connection = new AMQPStreamConnection('localhost', 5672, 'kang', 'a943434603', "test_host");
    //建立一个 channel
    $channel = $connection->channel();
    //能够运行这个命令不少次,可是只有一个队列会被建立, 在程序中重复将队列重复声明一下是种值得推荐的作法,保证队列存在
    $channel->queue_declare('hello', false, false, false, false);

    echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

    $callback = function($msg) {
        echo " [x] Received ", $msg->body, "\n";
        sleep($msg->body);
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };

    //默认状况下,队列会把消息公平的分配给各个消费者
    //若是某个消费者脚本处理完成分配给他的消息任务后,会一直空闲
    //另一个消费者脚本处理的消息都很是耗时,这就容易致使消费者脚本得不到合理利用,
    //加入此句话,是告诉队列,取消把消息公平分配到各个脚本,而是那个脚本空闲,就交给它一个消息任务
    //这样,合理利用到每个空闲的消费者脚本
    $channel->basic_qos(null, 1, null);

    /**
     * basic_consume 方法    从队列中读取数据
     * @param string $queue 指定队列
     * @param string $consumer_tag
     * @param bool $no_local
     * @param bool $no_ack    消费者处理完消息后,是否不须要告诉队列已经处理完成,true 不须要 false 须要,
     * true
        默认状况下,队列会把消息公平分配到各个消费者中,而后一次性把消息交给消费者,若是消费者处理了一半挂了,那么消息就丢失了
     * false
        默认状况下,队列会把消息公平的分配给各个消费者,而后一个一个的把消息分配到消费者脚本中,脚本处理完成后,告诉队列,队列会删除这个消息,而且接着给下一个消息,
        当脚本挂掉,不会丢失消息,队列会把未完成的消息分配给其余消费者
        在 callback 函数中须要加入这句话,处理完后通知队列能够删除消息了
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        未加入这句话,队列不会删除已处理完的消息,当脚本挂掉时,会把分配给当前队列的全部消息再次从新分配给其余队列,会致使消息会重复处理
     */
    $channel->basic_consume('hello', '', false, false, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}

发布/订阅

一个消息发送给多个消费者

扇形交换机 fanout
发布订阅模式,科院实现一个消息发送到多个队列中
在发布消息脚本中,建立一个扇形交换机,把消息推送到交换机,不须要推进到指定的队列中,队列在消费者脚本中建立
消费脚本定义个临时队列,并绑定这个临时队列到交换机中,扇形交换机会把接收到的消息推进到每个绑定的队列中

生产者脚本

public function handle()
{
    //链接到 test_host 虚拟主机,每一个虚拟主机有本身的队列,交换机...
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'kang', 'a943434603', 'test_host');
    //建立一个 channel
    $channel = $connection->channel();
    //声明 log 队列
    //$channel->queue_declare('log', false, false, false, false);
    //建立一个fanout类型交换机
    $channel->exchange_declare("logs","fanout",false,false,false);

    //建立一个消息
    $msg = new AMQPMessage( time() );
    $channel->basic_publish ( $msg , 'logs' );

    echo " [x] Sent ".time()."\n";

    $channel->close();
    $connection->close();
}

消费者脚本

public function handle()
{
    //链接
    $connection = new AMQPStreamConnection('localhost', 5672, 'kang', 'a943434603', "test_host");
    //建立一个 channel
    $channel = $connection->channel();
    //能够运行这个命令不少次,可是只有一个队列会被建立, 在程序中重复将队列重复声明一下是种值得推荐的作法,保证队列存在
    //$channel->queue_declare('hello', false, false, false, false);
    //建立一个fanout类型交换机
    $channel->exchange_declare('logs', 'fanout', false, false, false);

    //系统建立一个临时队列
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    //绑定临时队列到交换机上
    $channel->queue_bind($queue_name, 'logs');

    $callback = function($msg){
        echo ' [x] ', $msg->body, "\n";
    };
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}

直连交换机 direct
交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而肯定消息该分发到哪一个队列

clipboard.png

生产者,建立基本上和扇形交换机同样,不一样的是

$channel->exchange_declare("direct_logs","direct",false,false,false);
//建立一个消息
$msg = new AMQPMessage( time() );
//把消息推进到direct_logs交换机,并给消息加上路由 key,让消费者队列来根据 key 接收消息
$channel->basic_publish ( $msg , 'direct_logs', "warning" );

消费者

$channel->exchange_declare("direct_logs","direct",false,false,false);
//系统建立一个临时队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
//绑定临时队列到交换机上,并指定消息的路由 key
$channel->queue_bind($queue_name, 'direct_logs', 'error');
$channel->queue_bind($queue_name, 'direct_logs', 'warning');

主题交换机 topic

直连交换机中路由 key 匹配模式
(星号*) 用来表示一个单词.
(井号#) 用来表示任意数量(零个或多个)单词。

clipboard.png

Q1会接收到 a.orange.b 等key 值中间为 orange 的消息
Q2会接收到 a.b.rabbit, lazy.a, lazy.a.b.c 等消息

当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

生产者,指定路由 key

$channel->exchange_declare("topic_logs","topic",false,false,false);
//建立一个消息
$msg = new AMQPMessage( time() );
//把消息推进到direct_logs交换机,并给消息加上路由 key,让消费者队列来根据 key 接收消息
$channel->basic_publish ( $msg , 'topic_logs', "baidu.warning" );

消费者1

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'topic_logs', "baidu.#");

消费者2

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'topic_logs', 'ali.#');

参考

https://www.rabbitmq.com/tutorials/tutorial-one-php.html
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/

相关文章
相关标签/搜索