虽然 Kafka 是用 Java/Scala 语言编写的,但这不妨碍它对多语言的支持。能够在 Kafka 官网的 CLIENTS 查看 Kafka 支持的语言,其中包括 C/C++、Python、Go 等语言。php
PHP 操做 Kafka 须要安装 librdkafka 库和 kafka 的 PHP 扩展。
1.安装 librdkafka 库git
git clone https://github.com/edenhill/librdkafka.git ./configure make sudo make install
2.安装 php-kafka 扩展github
$ git clone https://github.com/arnaud-lb/php-rdkafka.git $ cd librdkafka/ $ phpize $ ./configure $ make $ sudo make install #在php.ini 文件中配置 rdkafka扩展 extension=rdkafka.so #查看扩展是否生效 php -m | grep kafka
demo 来源于 https://github.com/arnaud-lb/php-rdkafka#examplesapache
正常的生产逻辑以下:
1.配置生产者客户端参数及建立相应的生产者实例;bootstrap
/** * Create a producer */ $conf = new RdKafka\Conf(); $conf->set('log_level', LOG_DEBUG); //$conf->set('debug', 'all'); $rk = new RdKafka\Producer($conf); $rk->addBrokers("127.0.0.1");
2.构建主题;debug
/** * Create a topic instance from the producer */ $topic = $rk->newTopic("test");
3.发送消息;code
/** * Producing messages * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition. * 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,而且由 librdkafka 选择分区。 * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue. * 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。 * The message payload can be anything. * 消息能够是任何内容。 */ $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
4.关闭生产者实例。orm
/** * Proper shutdown * This should be done prior to destroying a producer instance * to make sure all queued and in-flight produce requests are completed before terminating. * 关闭生产者实例前需确保全部在队列中和正在生产的生产请求都已完成。 * Not calling flush can lead to message loss! * 不调用flush会致使消息丢失! */ $timeout_ms = 60000; // 1 minute $rk->flush($timeout_ms);
检验消息是否发送成功
终端开启一个消费者:server
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
在另外一个窗口执行 php producer.php,
blog
可看到消费者终端接收到消息。
完整代码以下:
<?php /** * Created by PhpStorm. * User: liulu * Date: 2020/1/1 * Time: 18:38 */ /** * Create a producer */ $conf = new RdKafka\Conf(); $conf->set('log_level', LOG_DEBUG); //$conf->set('debug', 'all'); $rk = new RdKafka\Producer($conf); $rk->addBrokers("127.0.0.1"); /** * Create a topic instance from the producer */ $topic = $rk->newTopic("test"); /** * Producing messages * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition. * 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,而且由 librdkafka 选择分区。 * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue. * 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。 * The message payload can be anything. * 消息能够是任何内容。 */ $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload"); /** * Proper shutdown * This should be done prior to destroying a producer instance * to make sure all queued and in-flight produce requests are completed before terminating. * 关闭生产者实例前需确保全部在队列中和正在生产的生产请求都已完成。 * Not calling flush can lead to message loss! * 不调用flush会致使消息丢失! */ $timeout_ms = 60000; // 1 minute $rk->flush($timeout_ms); echo 'finished';exit;