接上一篇文章,上次没有解决的一个问题就是在作一个队列的时候,存在多消费者消费到同一个消息的状况,今天终于解决了这个问题,问题的本质是由于运维给我建立的topic是有问题的,他建立的分区数量是0,我今天上容器看了一下,终于发现了,而后删了本身从新建了一个,具体容器操做kafka的topic教程能够看我另外一个文档基于kafka容器操做topic。php
在这里,咱们从头开始介绍一下topic(主题),partition(分区),group(分组),consumer(消费者),producer(生产者)的关系app
下面上一下下代码,修改以后topic的partition是3个,依旧是基于 enqueue/rdkafka 这个包负载均衡
生产者,不指定partition,kafka会自动分配运维
$connFactory = new RdKafkaConnectionFactory([
'global' => [
'metadata.broker.list' => '127.0.0.1:9092',
'socket.timeout.ms' => '50'
]
]);
$context = $connFactory->createContext();
$topic = $context->createQueue('app');
for ($i = 0; $i <= 5; $i++) {
$message = $context->createMessage('hello world!' . $i);
$context->createProducer()->send($topic, $message);
}
复制代码
消费者socket
$config = [
'global' => [
'group.id' => date('Ymd'), // 指定一个分区,分区名自定义,作队列分区名必须同样
'metadata.broker.list' => '127.0.0.1:9092',
'enable.auto.commit' => 'false',
],
'topic' => [
'auto.offset.reset' => 'latest',
],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$queue = $context->createQueue('app');
$consumer = $context->createConsumer($queue);
while (true) {
$message = $consumer->receive(30 * 1000);
if (!is_object($message)) {
continue;
}
var_dump($message->getBody());
$consumer->acknowledge($message);
$consumer->reject($message);
}
复制代码
启动三个消费者,运行一次生产者,能够获得如下结果spa
消费者1.net
string(13) "hello world!1"
string(13) "hello world!3"
string(13) "hello world!4"
string(13) "hello world!5"
复制代码
消费者2code
string(13) "hello world!2"
复制代码
消费者3blog
string(13) "hello world!0"
复制代码
到此就实现了kafka作队列的需求了,本文内容就到这里,相关kafka知识能够看这篇文章。教程