KafkaBridge 封装了对Kafka集群的读写操做,接口极少,简单易用,稳定可靠,支持c++/c、php、python、golang等多种语言,并特别针对php-fpm场景中做了长链接复用的优化,已在360公司内部普遍使用。php
前言python
众所周知,Kafka是近几年来大数据领域最流行的分布式流处理平台。它最初由LinkedIn公司开发, 已于2010年贡献给了Apache基金会并成为顶级开源项目, 本质上是一种低延时的、可扩展的、设计内在就是分布式的,分区的和可复制的消息系统;c++
Kafka在360公司内部也有至关普遍的使用,业务覆盖搜索,商业广告,IOT, 视频,安全, 游戏等几乎全部核心业务,天天的写入流量近1.2PB,读取流量近2.4PB;git
Kafka官方提供了Java版本的客户端SDK, 但因360公司内部产品线众多,语言几乎囊括目前全部主流语言,因此咱们研发了Kafka客户端SDK —— KafkaBridge;github
简介golang
KafkaBridge 底层基于 librdkafka, 与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的Kafka系统细节,只需调用极少许的接口,就可完成消息的生产和消费;安全
针对使用者比较关心的消息生产的可靠性,做了近一步的提高;异步
开源地址:分布式
https://github.com/Qihoo360/kafkabridgeide
特色
支持多种语言:c++/c、php、python、golang, 且各语言接口彻底统一;
接口少,简单易用;
针对高级用户,支持经过配置文件调整全部的librdkafka的配置;
在非按key写入数据的状况下,尽最大努力将消息成功写入;
支持同步和异步两种数据写入方式;
在消费时,除默认自动提交offset外,容许用户经过配置手动提交offset;
在php-fpm场景中,复用长链接生产消息,避免频繁建立断开链接的开销;
编译
编译依赖于librdkafka, liblog4cplus, boost
(仅依赖于若干个头文件);
对于C++/C使用 CMake 编译;
对于Python, Php, Golang使用 swig 编译;
每种语言都提供了自动编译脚本,方便使用者自行编译。
使用
数据写入
在非按key写入的状况下,sdk尽最大努力提交每一条消息,只要Kafka集群存有一台broker正常,就会重试发送;
每次写入数据只须要调用produce接口,在异步发送的场景下,经过返回值能够判断发送队列是否填满,发送队列可经过配置文件调整;
在同步发送的场景中,produce接口返回当前消息是否写入成功,可是写入性能会有所降低,CPU使用率会有所上升,推荐仍是使用异步写入方式;
咱们来简单看一下写入kafka所涉及到的全部接口:
//初始化接口
bool QbusProducer::init(const string& broker_list, const string& log_path, const string& config_path, const string& topic)
//写入数据接口
bool QbusProducer::produce(const char* data, size_t data_len, const std::string& key)
//再也不须要写入数据时,须要调用的清理接口,必须调用
void QbusProducer::uninit()
具体使用能够参考源码中的实例;
数据消费
消费只需调用subscribeOne订阅topic(也支持同时订阅多个topic),而后执行start就开始消费,当前进程非阻塞,每条消息经过callback接口回调给使用者;
sdk还支持用户手动提交offset方式,用户能够经过callback中返回的消息体,在代码其余逻辑中进行提交。
下面是消费接口,以c++为例:
//初始化接口
bool QbusConsumer::init(const string& string broker_list, const string& string log_path, const string& string config_path, QbusConsumerCallback& callback)
//订阅须要消费的消息
bool QbusConsumer::subscribeOne(const string& string group, const string& string topic)
//开始消费
bool QbusConsumer::start()
//中止消费
void QbusConsumer::stop()
性能测试
kafka 集群三台broker, 除测试用topic外,无其余topic的读写操做;
测试用topic有3个partition;
Producer单实例,单线程;
Topic无复本下测试:
单条消息 100 byte, 发送 1百万 条消息,耗时 1.7 秒;
单条消息 1024 byte, 发送 1百万 条消息,耗时 13 秒;
Topic有2复本下测试:
单条消息 100 byte, 发送 1百万 条消息,耗时 1.7 秒;
单条消息 1024 byte, 发送 1百万 条消息,耗时 14 秒;
写在最后
KafkaBridge 一直在360公司内部使用,如今已经开源,有疏漏之处,欢迎广大使用者批评指正,也欢迎更多的使用者加入到 KafkaBridge 的持续改进中。
开源地址:KafkaBridge:
https://github.com/Qihoo360/kafkabridge