librdkafka是用c语言实现的一个高性能的kafka客户端,由于性能强大,开发者们基于librdkafka开发了各类语言的kafka客户端,好比librdkafkad(c++),, node-rdkafka(Node.js), confulent-kafka-python(Python)等。
librdkafka的高性能主要体如今其多线程的设计以及尽量的下降内存拷贝。node
librdkafka github地址:https://github.com/edenhill/l... ,
其中,C语言API能够参考src/rdkafka.h头文件,简要介绍几个关键的对象python
建立这几个对象所使用的函数:c++
librdkafka支持多种协议以控制kafka服务器的访问权限,如SASL_PALIN, PLAINTEXT, SASL_SSL等,在使用librdkafka时须要经过security.protocol参数指定协议类型,再辅以相应协议所需的其它参数完成权限认证。git
若是使用SASL协议进行权限认证,须要对librdkafka添加SASL库依赖并从新编译。例如:在CentOS下安装以下依赖包:github
yum -y install cyrus-sasl cyrus-sasl-devel
通过从新编译librdkafka后,进入examples目录下,执行bootstrap
./rdkafka_example -X builtin.features
结果为:api
builtin.features = gzip,snappy,ssl,sasl,regex
能够看到librdkafka已经有了sasl特性,后续能够经过sasl协议进行访问认证。安全
初始化producer服务器
int KafkaApi::init_producer(const std::string &brokers, const std::string &username, const std::string &password) { char errstr[512]; /* Kafka configuration */ if (NULL == conf_) { conf_ = rd_kafka_conf_new(); } rd_kafka_conf_set(conf_, "queued.min.messages", "20", NULL, 0); rd_kafka_conf_set(conf_, "bootstrap.servers", brokers.c_str(), errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "sasl.username", username.c_str(), errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "sasl.password", password.c_str(), errstr, sizeof(errstr)); rd_kafka_conf_set(conf_, "api.version.request", "true", errstr, sizeof(errstr)); rd_kafka_conf_set_dr_msg_cb(conf_, dr_msg_cb_trampoline); /* Create Kafka handle */ if (!(rk_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to init producer: %s\n", errstr); exit(1); } return 0; }
初始化过程介绍:多线程
一、首先经过rd_kafka_conf_new()函数建立rd_kafka_conf_t对象
二、设置rd_kafka_conf_t对象,设置kafka客户端参数,示例参数为:
三、调用rd_kafka_new()函数建立rd_kafka_t对象
发送消息
int KafkaApi::send_message(const std::string &topic, const char *data, const int &data_len) { rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk_, topic.c_str(), NULL); if (!rkt) { COMMLIB_LOG_ERR("kafka: create topic failed, err:%s", rd_kafka_err2str(rd_kafka_errno2err(errno))); return rt::KDFKA_PRODUCE_ERR; } int ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, const_cast<char *>(data), data_len, NULL, 0, NULL); if (ret == -1) { COMMLIB_LOG_ERR("kafka: send message failed, err:%s", rd_kafka_err2str(rd_kafka_errno2err(errno))); return rt::KDFKA_PRODUCE_ERR; } COMMLIB_LOG_DEBUG("produce message [%s]", data); rd_kafka_poll(rk_, 0); return rt::SUCCESS; }
发送消息过程介绍:
一、经过rd_kafka_topic_new()方法建立rd_kafka_topic_t对象,注意topic是自动建立的(须要broker端设置可否自动建立topic的参数:auto.create.topics.enable=true), 除此以外,topic可否建立成功还与认证用户的权限有关,若是认证用户在broker端为super.users,则topic可以自动建立成功,不然则会报错: 用户无权限,须要先给用户添加ACL权限才行;最后一点,对于已经存在的topic, rd_kafka_topic_new()方法仍然返回的是旧的对象
二、发送消息经过调用rd_kafka_produce()函数完成,该函数的参数为:
三、调用rd_kafka_poll()函数,使得消息发送的回调函数可以触发, 该函数第一个参数为rd_kafka_t对象,第二个参数为timeout_ms,设置为0表示为非阻塞
注意事项
在使用librdkafka带鉴权认证访问kafka服务器的过程当中,解决消息发送失败问题的关键点有: