<?phpphp
/*
* 发布-订阅
* create by superid
*/json
$queueName = 'superid';
$exchangeName = 'superid';
$routeKey = 'superid';函数
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$connection = new AMQPConnection($conn_args); //建立链接
$connection->connect() or die("Cannot connect to the broker!\n");
try {
$channel = new AMQPChannel($connection); // 创建一个 Channel信道fetch
$exchange = new AMQPExchange($channel); //建立交换机对象
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exchange->setFlags(AMQP_DURABLE); //持久化
$exchange->declareExchange();spa
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE); //持久化队列,并不表示消息也会持久化
$queue->declareQueue();code
$queue->bind($exchangeName, $routeKey); //绑定交换机与队列,并指定路由键 对象
for($i=0; $i<50000; ++$i){ blog
$message = json_encode(array(
'id' => $i,
'name' => 'liming',
'note' => '这是一个备注',
));队列
//Direct类型是使用最多的,使用肯定的routingkey。
//这种模型下,接收消息时绑定'routeKey'则只接收routeKey的消息路由
$result = $exchange->publish($message,$routeKey);
var_dump("[{$result}-{$i}]");
}
} catch (AMQPConnectionException $e) {
var_dump($e);
exit();
}
$connection->disconnect();
?>
<?php
/*
* 发布-订阅
* create by superid
*/
$queueName = 'superid';
$exchangeName = 'superid';
$routeKey = 'superid';
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$connection = new AMQPConnection($conn_args);
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection); // 创建一个 Channel信道
//若是Consumer数量不少或者但愿每一个Consumer同时只处理一个任务
//能够经过在Consumer中设置PrefetchCount来实现更加均匀的任务分发。,收到ack应答,才会发生消息,若是有一条没有收到ack,则不会发送消息了
$channel->setPrefetchCount(1);
$exchange = new AMQPExchange($channel); //建立交换机
$exchange->setName($exchangeName); //设置交换机的名字
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 Exchange 类型: direct fanout topic
$exchange->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$exchange->declareExchange()."\n";
$queue = new AMQPQueue($channel); //建立队列
$queue->setName($queueName); //设置队列的名称
$queue->setFlags(AMQP_DURABLE); //设置持久化队列
echo "Message Total:".$queue->declareQueue()."\n"; //队列中消息的总量
$queue->bind($exchangeName, $routeKey);
//阻塞模式接收消息
echo "Message:\n";
while(True){
$queue->consume('processMessage');
//自动ACK应答
//$queue->consume('processMessage', AMQP_AUTOACK);
}
$conn->disconnect();
/**
* 消费回调函数
* 处理消息
* *须要注意的地方是:queue对象有两个方法可用于取消息: consume 和 get 。
*前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,
*取消息时有则取,无则返回false。
*/
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
//手动发送ACK应答, 若是不进行确认,消息会积累愈来愈多,产生严重bug
$queue->ack($envelope->getDeliveryTag());
}
?>
备注:
发送消息时,只要有“交换机”就够了。至于交换机后面有没有对应的处理队列,发送方是不用管的。
对于交换机,有两个重要的概念:
A,类型。有三种类型: Fanout类型最简单,这种模型忽略routingkey;Direct类型是使用最多的,使用肯定的routingkey。这种模型下,接收消息时绑定'key_1'则只接收key_1的消息;最后一种是Topic,这种模式与Direct相似,可是支持通配符进行匹配,好比: 'key_*',就会接受key_1和key_2。Topic貌似美好,可是有可能致使不严谨,因此仍是推荐使用Direct。
B,持久化。指定了持久化的交换机,在从新启动时才能重建,不然须要客户端从新声明生成才行。
须要特别明确的概念:交换机的持久化,并不等于消息的持久化。只有在持久化队列中的消息,才能持久化;若是没有队列,消息是没有地方存储的;消息自己在投递时也有一个持久化标志的,PHP中默认投递到持久化交换机就是持久的消息,不用特别指定
queue: 队列
事实上,队列仅是针对接收方(consumer)的,由接收方根据需求建立的。只有队列建立了,交换机才会将新接受到的消息送到队列中,交换机是不会在队列建立以前的消息放进来的。换句话说,在创建队列以前,发出的全部消息都被丢弃了。
下面这个图比RabbitMQ官方的图更清楚——Queue是属于ReceiveMessage的一部分。