(交换器)
(队列)
(消息)php
普通发送git
$ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params);
事务模式:单个发送github
$ch->startTransaction(); try { $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params); $ch->commitTransaction(); } catch(AMQPConnectionException $e) { $ch->rollbackTransaction(); }
事务模式:批量发送网络
$loop_times = 10; $ch->startTransaction(); for($i=0;$i<$loop_times;$i++) { try { $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params); $ch->commitTransaction(); } catch(AMQPConnectionException $e) { $ch->rollbackTransaction(); } }
单个消息异步
$ch->confirmSelect(); $ex->publish($message, $routing_key,AMQP_MANDATORY, $params); $ack_callback = function ($delivery_tag, $multiple) { // ack处理 echo 'Message acked', PHP_EOL; var_dump(func_get_args()); return true; }; $nack_callback = function ($delivery_tag, $multiple, $requeue) use ($message) { // nack处理: 从新发送消息,或记录日志 echo 'Message nacked', PHP_EOL; var_dump(func_get_args()); return false; }; $ch->setConfirmCallback($ack_callback, $nack_callback); // 设置回调 $ch->waitForConfirm(1); // 在setConfirmCallback()后调用
来自broker的ack确认svn
批量消息函数
$ch->confirmSelect(); // 批量发布,一次确认 $messages = []; for($i=0;$i<10;$i++) { $messages[$i] = $i.$message; $ex->publish($messages[$i], $routing_key, AMQP_MANDATORY, $params); } $ack_callback = function ($delivery_tag, $multiple) { // ack处理 echo 'Message acked', PHP_EOL; var_dump(func_get_args()); return true; }; $nack_callback = function ($delivery_tag, $multiple, $requeue) use ($messages) { // nack处理: 从新发送消息该批次消息,或者记录日志 echo 'Message nacked', PHP_EOL; var_dump(func_get_args()); return false; }; $ch->setConfirmCallback($ack_callback, $nack_callback); // 设置回调 $ch->waitForConfirm(1); // 在setConfirmCallback()后调用
broker发送了两个ack确认(完整的网络包中producer发送了10条消息);
若是收到nack信令,须要从新发送整个批次消息。
小问题:为何broker回复了两个ack?broker回复ack数量和机制是什么?oop
自动ack测试
$conn->setReadTimeout(3); // 无数据时,超时时间设置 $consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true); // consume try { $data = $q->consume($callback, AMQP_AUTOACK, $consumer_tag); } catch (Exception $exception) { // 无数据,超时处理 // 显式取消消费 $q->cancel($consumer_tag); }
3s后超时,cancel掉消费fetch
完整例子(consume过程)
$conn->setReadTimeout(3); $consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true); $num = 3; $callback = function (AMQPEnvelope $envelope, AMQPQueue $queue) use(&$num){ $tag = $envelope->getDeliveryTag(); var_dump($tag); var_dump($num); if($num <= 0){ // MARK:false显式退出callback,会致使丢数据 // 例如队列中有30条数据,条件判断$num<=0退出,会致使业务逻辑只处理了4条数据 // 剩余的数据未处理而丢掉 // return false; } $num --; return true; }; // consume try { $data = $q->consume($callback, AMQP_AUTOACK, $consumer_tag); } catch (Exception $exception) { // 显式取消消费 $q->cancel($consumer_tag); }
(consume数据,到无数据)
另外:
qos()/setPrefetchCount()/setPrefetchSize()对于autoack无效
autoack小结:
回调超时经过timeout实现,且catch异常后,须要cancel消费者;
callback回调函数,不该该return false(形成队列丢数据),如遇异常能够记录日志等;
qos()等限流设置对autoack无效。
manual ack
$conn->setReadTimeout(3); $ch->qos(0,1); // prefetchCount:最多1条unacked消息 $consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true); $num = 3; $callback = function (AMQPEnvelope $envelope, AMQPQueue $queue) use(&$num){ $tag = $envelope->getDeliveryTag(); $msg = $envelope->getBody(); var_dump(implode(',',[$tag, $num, $msg]).PHP_EOL); // 手动ack $queue->ack($tag); sleep(1); if($num <= 0){ // 显式退出callback return false; } $num --; return true; }; // consume try { $data = $q->consume($callback, AMQP_NOPARAM, $consumer_tag); } catch (Exception $exception) { // 显式取消消费 $q->cancel($consumer_tag); }
callback()回调函数中,只处理4条;
尽管queue推送了5条消息,可是consumer只确认了4条,所以队列里只是减小了4条消息
manual ack小结:
timeout实现回调超时,同autoack;
回调函数内能够按逻辑return false,不会丢失消息(消息短暂处于unacked状态后,恢复至ready状态);
qos()等限流函数,有效,能够避免客户端内存溢出
no ack
消息须要显式ack(),可是没有执行ack()
结论:
消息处于unacked状态,consumer channel断开后,消息恢复至ready状态
拉模式:非阻塞消费
auto ack
$conn->setReadTimeout(3); // 拉模式:非阻塞 $ch->qos(0,10); // 自动ack,qos设置无效 $num = 3; for($i=0;$i<$num;$i++) { $envelope = $q->get(AMQP_AUTOACK); var_dump($envelope->getDeliveryTag()."||".$envelope->getBody()); echo "<br>"; }
有数据时,当即返回数据;无数据时,当即返回false
manual ack
$conn->setReadTimeout(3); // 拉模式:非阻塞 $ch->qos(0,10); // 自动ack,qos设置无效 $num = 3; for($i=0;$i<$num;$i++) { $envelope = $q->get(); var_dump($envelope->getDeliveryTag()."||".$envelope->getBody()); echo "<br>"; if($i!=$num-1) { $q->ack($envelope->getDeliveryTag()); } }
拉取了3条消息,只ack了两条
最终,只消费了队列内的两条数据,第3条数据短暂变成unacked状态后,恢复至ready状态
no ack
结论:
未ack的消息,会恢复至ready状态
Refer:
RabbitMQ专栏:https://blog.csdn.net/u013256...
php-amqp测试用例:https://github.com/pdezwart/p...