PHP使用RabbitMQ实例

相关博文:
CentOS6.9安装RabbitMQ和源码编译安装php的RabbitMQ扩展
RabbitMQ入门基础
CentOS7源码编译安装nginx+php7.2+mysql5.7并使用systemctl管理
RabbitMQ的安装过程,工做流程,和一些基础概念已经在前面的笔记中提到了,今天在本地实现了php链接RabbitMQ,以及消息的生产和消费的过程,首先看下没有生产者和消费者的默认RabbitMQ管理界面截图:
Connections:

尚未任何链接(Connections)
Channels:

尚未任何通道(Channels)
Exchanges:

交换机只有系统默认的
Queues:

尚未任何队列
先上消费者代码consumer.phpphp

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/15
 * Time: 13:16
 */
//声明链接参数
$config = array(
    'host' => '192.168.75.132',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'test',
    'password' => 'test'
);
//链接broker
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}
//在链接内建立一个通道
$ch = new AMQPChannel($cnn);
//建立一个交换机
$ex = new AMQPExchange($ch);
//声明路由键
$routingKey = 'key_1';
//声明交换机名称
$exchangeName = 'exchange_1';
//设置交换机名称
$ex->setName($exchangeName);
//设置交换机类型
//AMQP_EX_TYPE_DIRECT:直连交换机
//AMQP_EX_TYPE_FANOUT:扇形交换机
//AMQP_EX_TYPE_HEADERS:头交换机
//AMQP_EX_TYPE_TOPIC:主题交换机
$ex->setType(AMQP_EX_TYPE_DIRECT);
//设置交换机持久
$ex->setFlags(AMQP_DURABLE);
//声明交换机
$ex->declareExchange();
//建立一个消息队列
$q = new AMQPQueue($ch);
//设置队列名称
$q->setName('queue_1');
//设置队列持久
$q->setFlags(AMQP_DURABLE);
//声明消息队列
$q->declareQueue();
//交换机和队列经过$routingKey进行绑定
$q->bind($ex->getName(), $routingKey);
//接收消息并进行处理的回调方法
function receive($envelope, $queue) {
    //休眠两秒,
    sleep(2);
    //echo消息内容
    echo $envelope->getBody()."\n";
    //显式确认,队列收到消费者显式确认后,会删除该消息
    $queue->ack($envelope->getDeliveryTag());
}
//设置消息队列消费者回调方法,并进行阻塞
$q->consume("receive");
//$q->consume("receive", AMQP_AUTOACK);//隐式确认,不推荐

 

以上是消费者代码,打开两个命令行/终端
输入php consumer.php,消费者开始阻塞获取消息,以下图

此时再看RabbitMQ管理界面:
Connections

出现两个链接,这两个就是消费者,由于他们在阻塞着等待消息
Channels

消费者在各自的链接里都打开了一个通道
Exchanges

其中一个消费者建立了一个持久的直连交换机
Queues

消息队列已经建立,但消息数是0,由于此时尚未生产者
生产者代码publisher.phphtml

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/15
 * Time: 13:15
 */

$config = array(
    'host' => '192.168.75.132',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'test',
    'password' => 'test'
);
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}
$ch = new AMQPChannel($cnn);
$ex = new AMQPExchange($ch);
//消息的路由键,必定要和消费者端一致
$routingKey = 'key_1';
//交换机名称,必定要和消费者端一致,
$exchangeName = 'exchange_1';
$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
//建立10个消息
for ($i=1;$i<=10;$i++){
    //消息内容
    $msg = array(
        'data'  => 'message_'.$i,
        'hello' => 'world',
    );
    //发送消息到交换机,并返回发送结果
    //delivery_mode:2声明消息持久,持久的队列+持久的消息在RabbitMQ重启后才不会丢失
    echo "Send Message:".$ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
    //代码执行完毕后进程会自动退出
}

 

 

以上是生产者代码
在执行以前,先关掉前面的两个消费者,打开一个命令行/终端,输入php publisher.php,因为生产者不须要阻塞,执行完进程便退出,因此如今RabbitMQ管理界面中既没有Connections也没有Channels,可是Queues已经被Exchanges投递过去了10条消息,以下图:

由于咱们执行生产者以前已经关掉了所有消费者,因此此时消息在队列中等待获取;
由于在发送消息时设置了delivery_mode:2来声明消息持久化,此时若是重启RabbitMQ,消息还会恢复;此时从新执行消费者,假设仍是两个,打开两个命令行/终端,输入php consumer.php,咱们能够看到消息被消费,以下图:

提醒:生产者在生产消息时,若是不存在指定队列,而且没有建立队列,或者队列存在但消息路由键和交换机与队列绑定的键(路由规则)不一致(直连交换机必须一致),则消息会被交换机丢弃。
原文地址:PHP使用RabbitMQ实例mysql

相关文章
相关标签/搜索