php的kafka踩坑(二)

接上一篇文章,上次没有解决的一个问题就是在作一个队列的时候,存在多消费者消费到同一个消息的状况,今天终于解决了这个问题,问题的本质是由于运维给我建立的topic是有问题的,他建立的分区数量是0,我今天上容器看了一下,终于发现了,而后删了本身从新建了一个,具体容器操做kafka的topic教程能够看我另外一个文档基于kafka容器操做topicphp

在这里,咱们从头开始介绍一下topic(主题),partition(分区),group(分组),consumer(消费者),producer(生产者)的关系app

  • producer,生产者,生产数据
  • consumer,消费者,消费数据
  • topic,简单点说就是一个队列,生产者生产数据和消费者消费数据都必须指定一个Topic,就是生产的数据要放到哪一个队列去给消费者消费
  • partition和group,一个topic能够配置多个partition,consumer消费数据时是按照group来消费的,kafka确保每一个partition只能由同一个group中的同一个consumer消费,若是想要重复消费,那么须要其余的组来消费,因此同一个group的消费者数量应当小于等于partition数量。Zookeerper中保存这每一个topic下的每一个partition在每一个group中消费的offset。(此段介绍引用这篇文章
    • consumer读取时,会指定读取的group,同一个消息在同一个group下只会读取到一次,若是要重复消费数据,须要新建group
    • 若是group只有一个,而且有多个partition,一个consumer时,全部partition里的消息都会发往该consumer,若是consumer不止一个时,可能会存在有的consumer里面数据消费的多,有的消费的少,作多消费者的队列就是用这个特性,当partition数量=consuerm时,消息能够达到负载均衡。

下面上一下下代码,修改以后topic的partition是3个,依旧是基于 enqueue/rdkafka 这个包负载均衡

生产者,不指定partition,kafka会自动分配运维

$connFactory = new RdKafkaConnectionFactory([
    'global' => [
        'metadata.broker.list' => '127.0.0.1:9092',
        'socket.timeout.ms' => '50'
    ]
]);
$context = $connFactory->createContext();
$topic = $context->createQueue('app');
for ($i = 0; $i <= 5; $i++) {
    $message = $context->createMessage('hello world!' . $i);
    $context->createProducer()->send($topic, $message);
}
复制代码

消费者socket

$config = [
    'global' => [
        'group.id' => date('Ymd'), // 指定一个分区,分区名自定义,作队列分区名必须同样
        'metadata.broker.list' => '127.0.0.1:9092',
        'enable.auto.commit' => 'false',
    ],
    'topic' => [
        'auto.offset.reset' => 'latest',
    ],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$queue = $context->createQueue('app');
$consumer = $context->createConsumer($queue);
while (true) {
    $message = $consumer->receive(30 * 1000);
    if (!is_object($message)) {
        continue;
    }
    var_dump($message->getBody());
    $consumer->acknowledge($message);
    $consumer->reject($message);
}
复制代码

启动三个消费者,运行一次生产者,能够获得如下结果spa

消费者1.net

string(13) "hello world!1"
string(13) "hello world!3"
string(13) "hello world!4"
string(13) "hello world!5"
复制代码

消费者2code

string(13) "hello world!2"
复制代码

消费者3blog

string(13) "hello world!0"
复制代码

到此就实现了kafka作队列的需求了,本文内容就到这里,相关kafka知识能够看这篇文章教程

相关文章
相关标签/搜索