最近项目上有一个须要用到消息队列的功能,从网上找了一些php相关的kafka使用的教程和博客,大抵都是安装php的拓展librdkafka(这里就不讲这个拓展的安装方法了,搜一下仍是有不少教程的),而后直接用这个拓展进行开发,可是我直接用这个拓展开发的时候,不知道为啥运行不起来,一直报错(应该是我太菜了,哈哈哈哈哈哈)......我从github上找了一些相关的包想直接用一下,可是发现不少包都是几年前的了,基本上都是kafka 0.x 版本的。php
我看了一下最多star的一个包,连接是:github.com/weiboad/kaf…git
后面我用了enqueue/rdkafka 这个包,连接:github.com/php-enqueue…github
下面展现一下我测试的代码,只是能运行起来,可是还没达到我想要的预期app
生产者composer
$connFactory = new RdKafkaConnectionFactory([
'global' => [
'metadata.broker.list' => '127.0.0.1:9092',
'socket.timeout.ms' => '50'
]
]);
$context = $connFactory->createContext();
$message = $context->createMessage('hello world!');
$topic = $context->createTopic('app');
$context->createProducer()->send($topic, $message);
复制代码
消费者socket
$config = [
'global' => [
'group.id' => uniqid('', true),
'metadata.broker.list' => '127.0.0.1:9092',
'enable.auto.commit' => 'false',
],
'topic' => [
// 设置从最后一个offset开始读取消息,不会读取到以前的消息
'auto.offset.reset' => 'latest',
],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$topic = $context->createTopic('app');
$consumer = $context->createConsumer($topic);
while (true) {
$message = $consumer->receive(30 * 1000);
if (!$message instanceof RdKafkaMessage && !$message instanceof Message) {
var_dump($message);
continue;
}
$consumer->acknowledge($message);
var_dump($message->getBody());
}
复制代码
这样是能运行起来的,能正常发送和接受数据,我实际状况是想作一个队列,生产者有多个,往一个topic进行数据生产,而后有多个消费者在消费,可是这样写有个问题,我在启动了多个消费者的时候,每一个消费者都会接受到生产者发送过来的消息,更像是群体发布群体订阅的形式,不是我想要的结果,我去网上找了其余的教程,有人说只要设置group_id不一样就能够了,但个人group_id所有都是随机的,不太可能同样,按理来讲是能实现的,可是就是不行,也试过用Queue去操做,可是仍是会全部消费者都收到消息。测试
目前就只了解到了这个地方,还须要花点时间再看看,若是有大神看到的话,求指点一下!!spa