本套技术专栏是做者(秦凯新)平时工做的总结和升华,并深度整理大量网上资源和专业书籍。经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何学术交流,可随时联系。java
这里的偏移量是指 kafka consumer offset,在 Kafka 0.9 版本以前消费者偏移量默认被保存在 zookeeper 中(/consumers/<group.id>/offsets//),所以在初始化消费者的时候须要指定 zookeeper.hosts。apache
随着 Kafka consumer 在实际场景的不断应用,社区发现旧版本 consumer 把位移提交到 ZooKeeper 的作法并不合适。ZooKeeper 本质上只是一个协调服务组件,它并不适合做为位移信息的存储组件,毕竟频繁高并发的读/写操做并非 ZooKeeper 擅长的事情。所以在 0.9 版本开始 consumer 将位移提交到 Kafka 的一个内部 topic(__consumer_offsets)中,该主题默认有 50 个分区,每一个分区 3 个副本。bootstrap
默认状况下,consumer 是自动提交位移的,自动提交间隔是 5 秒,能够经过设置 auto.commit.interval.ms 参数能够控制自动提交的间隔。并发
自动位移提交的优点是下降了用户的开发成本使得用户没必要亲自处理位移提交;劣势是用户不能细粒度地处理位移的提交,特别是在有较强的精确一次处理语义时(在这种状况下,用户可使用手动位移提交)。app
手动位移提交就是用户自行肯定消息什么时候被真正处理完并能够提交位移,用户能够确保只有消息被真正处理完成后再提交位移。若是使用自动位移提交则没法保证这种时序性,所以在这种状况下必须使用手动提交位移。异步
设置使用手动提交位移很是简单,仅仅须要在构建 KafkaConsumer 时设置 enable.auto.commit=false,而后调用 commitSync 或 commitAsync 方法便可。高并发
对于 auto.offset.reset 我的推荐设置为 earliest,初次运行的时候,因为 __consumer_offsets 没有相关偏移量信息,所以消息会从最开始的地方读取;当第二次运行时,因为 __consumer_offsets 已经存在消费的 offset 信息,所以会根据 __consumer_offsets 中记录的偏移信息继续读取数据。工具
此外,对于使用 zookeeper 管理偏移量而言,只须要删除对应的节点,数据便可从头读取,也是很是方便。不过若是你但愿从最新的地方读取数据,不须要读取旧消息,则能够设置为 latest。oop
earilist:提交过度区,从Offset处读取,若是没有提交过offset,从头读取
latest:提交过度区,从Offset处读取,没有从最新的数据开始读取
None:若是没有提交offset,就会报错,提交过offset,就从offset处读取
复制代码
基于正则订阅主题,有如下好处:fetch
无需罗列主题名,一两个主题还好,若是有几十个,罗列过于麻烦了;
可实现动态订阅的效果(新增的符合正则的主题也会被读取)。
stream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset))
复制代码
LocationStrategies 分配分区策略,LocationStrategies:根据给定的主题和集群地址建立consumer
建立DStream,返回接收到的输入数据
LocationStrategies.PreferConsistent:持续的在全部Executor之间匀分配分区 (均匀分配,选中的每个Executor都会分配 partition)
LocationStrategies.PreferBrokers: 若是executor和kafka brokers 在同一台机器上,选择该executor。
LocationStrategies.PreferFixed: 若是机器不是均匀的状况下,能够指定特殊的hosts。固然若是不指定,采用 LocationStrategies.PreferConsistent模式
复制代码
SparkStreaming 序列化问题
在 driver 中使用到的变量或者对象无需序列化,传递到 exector 中的变量或者对象须要序列化。所以推荐的作法是,在 exector 中最好只处理数据的转换,在 driver 中对处理的结果进行存储等操做。
stream.foreachRDD(rdd => {
// driver 代码运行区域
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaOffset.updateOffset(offsetRanges)
// exector 代码运行区域
val resultRDD = rdd.map(xxxxxxxx)
//endregion
//对结果进行存储
resultRDD.saveToES(xxxxxx)
kafkaOffset.commitOffset(offsetRanges)
})
复制代码
Zookeeper 偏移量管理ZkKafkaOffset实现,借助 zookeeper 管理工具能够对任何一个节点的信息进行修改、删除,若是但愿从最开始读取消息,则只须要删除 zk 某个节点的数据便可。
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.OffsetRange
import scala.collection.JavaConverters._
class ZkKafkaOffset(getClient: () => ZkClient, getZkRoot : () => String) {
// 定义为 lazy 实现了懒汉式的单例模式,解决了序列化问题,方便使用 broadcast
lazy val zkClient: ZkClient = getClient()
lazy val zkRoot: String = getZkRoot()
// offsetId = md5(groupId+join(topics))
// 初始化偏移量的 zk 存储路径 zkRoot
def initOffset(offsetId: String) : Unit = {
if(!zkClient.exists(zkRoot)){
zkClient.createPersistent(zkRoot, true)
}
}
// 从 zkRoot 读取偏移量信息
def getOffset(): Map[TopicPartition, Long] = {
val keys = zkClient.getChildren(zkRoot)
var initOffsetMap: Map[TopicPartition, Long] = Map()
if(!keys.isEmpty){
for (k:String <- keys.asScala) {
val ks = k.split("!")
val value:Long = zkClient.readData(zkRoot + "/" + k)
initOffsetMap += (new TopicPartition(ks(0), Integer.parseInt(ks(1))) -> value)
}
}
initOffsetMap
}
// 根据单条消息,更新偏移量信息
def updateOffset(consumeRecord: ConsumerRecord[String, String]): Boolean = {
val path = zkRoot + "/" + consumeRecord.topic + "!" + consumeRecord.partition
zkClient.writeData(path, consumeRecord.offset())
true
}
// 消费消息前,批量更新偏移量信息
def updateOffset(offsetRanges: Array[OffsetRange]): Boolean = {
for (offset: OffsetRange <- offsetRanges) {
val path = zkRoot + "/" + offset.topic + "!" + offset.partition
if(!zkClient.exists(path)){
zkClient.createPersistent(path, offset.fromOffset)
}
else{
zkClient.writeData(path, offset.fromOffset)
}
}
true
}
// 消费消息后,批量提交偏移量信息
def commitOffset(offsetRanges: Array[OffsetRange]): Boolean = {
for (offset: OffsetRange <- offsetRanges) {
val path = zkRoot + "/" + offset.topic + "!" + offset.partition
if(!zkClient.exists(path)){
zkClient.createPersistent(path, offset.untilOffset)
}
else{
zkClient.writeData(path, offset.untilOffset)
}
}
true
}
def finalize(): Unit = {
zkClient.close()
}
}
object ZkKafkaOffset{
def apply(cong: SparkConf, offsetId: String): ZkKafkaOffset = {
val getClient = () =>{
val zkHost = cong.get("kafka.zk.hosts", "127.0.0.1:2181")
new ZkClient(zkHost, 30000)
}
val getZkRoot = () =>{
val zkRoot = "/kafka/ss/offset/" + offsetId
zkRoot
}
new ZkKafkaOffset(getClient, getZkRoot)
}
}
复制代码
Spark Streaming 消费 Kafka 消息
第一步:val customOffset: Map[TopicPartition, Long] = kafkaOffset.getOffset(ssc)
第二步:stream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaConf, customOffset))
第三步:处理后,kafkaOffset.commitOffset(offsetRanges)
import scala.collection.JavaConverters._
object RtDataLoader {
def main(args: Array[String]): Unit = {
// 从配置文件读取 kafka 配置信息
val props = new Props("xxx.properties")
val groupId = props.getStr("groupId", "")
if(StrUtil.isBlank(groupId)){
StaticLog.error("groupId is empty")
return
}
val kfkServers = props.getStr("kfk_servers")
if(StrUtil.isBlank(kfkServers)){
StaticLog.error("bootstrap.servers is empty")
return
}
val topicStr = props.getStr("topics")
if(StrUtil.isBlank(kfkServers)){
StaticLog.error("topics is empty")
return
}
// KAFKA 配置设定
val topics = topicStr.split(",")
val kafkaConf = Map[String, Object](
"bootstrap.servers" -> kfkServers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"receive.buffer.bytes" -> (102400: java.lang.Integer),
"max.partition.fetch.bytes" -> (5252880: java.lang.Integer),
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val conf = new SparkConf().setAppName("ss-kafka").setIfMissing("spark.master", "local[2]")
// streaming 相关配置
conf.set("spark.streaming.stopGracefullyOnShutdown","true")
conf.set("spark.streaming.backpressure.enabled","true")
conf.set("spark.streaming.backpressure.initialRate","1000")
// 设置 zookeeper 链接信息
conf.set("kafka.zk.hosts", props.getStr("zk_hosts", "sky-01:2181"))
// 建立 StreamingContext
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
// 根据 groupId 和 topics 获取 offset
val offsetId = SecureUtil.md5(groupId + topics.mkString(","))
val kafkaOffset = ZkKafkaOffset(ssc.sparkContext.getConf, offsetId)
kafkaOffset.initOffset(ssc, offsetId)
val customOffset: Map[TopicPartition, Long] = kafkaOffset.getOffset(ssc)
// 建立数据流
var stream:InputDStream[ConsumerRecord[String, String]] = null
if(topicStr.contains("*")) {
StaticLog.warn("使用正则匹配读取 kafka 主题:" + topicStr)
stream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset))
}
else {
StaticLog.warn("待读取的 kafka 主题:" + topicStr)
stream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaConf, customOffset))
}
// 消费数据
stream.foreachRDD(rdd => {
// 消息消费前,更新 offset 信息
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaOffset.updateOffset(offsetRanges)
//region 处理详情数据
StaticLog.info("开始处理 RDD 数据!")
//endregion
// 消息消费结束,提交 offset 信息
kafkaOffset.commitOffset(offsetRanges)
})
ssc.start()
ssc.awaitTermination()
}
}
复制代码
setStartFromGroupOffsets()【默认消费策略】 默认读取上次保存的offset信息 若是是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据
setStartFromEarliest() 从最先的数据开始进行消费,忽略存储的offset信息
setStartFromLatest() 从最新的数据进行消费,忽略存储的offset信息
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>) 从指定位置进行消费。
当checkpoint机制开启的时候,KafkaConsumer会按期把kafka的offset信息还有其余operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,从新消费kafka中的数据。
为了可以使用支持容错的kafka Consumer,须要开启checkpoint env.enableCheckpointing(5000); // 每5s checkpoint一次
Kafka Consumers Offset 自动提交有如下两种方法来设置,能够根据job是否开启checkpoint来区分:
(1) Flink Checkpoint关闭时: 能够经过Kafka下面两个Properties参数配置
enable.auto.commit
auto.commit.interval.ms
复制代码
(2) Checkpoint开启时:当执行checkpoint的时候才会保存offset,这样保证了kafka的offset和checkpoint的状态偏移量保持一致。能够经过这个参数设置
setCommitOffsetsOnCheckpoints(boolean)
复制代码
这个参数默认就是true。表示在checkpoint的时候提交offset, 此时,kafka中的自动提交机制就会被忽略。
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置statebackend
env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
String topic = "kafkaConsumer";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","SparkMaster:9092");
prop.setProperty("group.id","kafkaConsumerGroup");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);
myConsumer.setStartFromGroupOffsets();//默认消费策略
myConsumer.setCommitOffsetsOnCheckpoints(true);
DataStreamSource<String> text = env.addSource(myConsumer);
text.print().setParallelism(1);
env.execute("StreamingFromCollection");
复制代码
Flink KafkaConsumer容许配置向 Kafka brokers(或者向Zookeeper)提交offset的行为。须要注意的是,Flink Kafka Consumer并不依赖于这些提交回Kafka或Zookeeper的offset来保证容错。这些被提交的offset只是意味着Flink将消费的状态暴露在外以便于监控。
FlinkKafkaConsumer提供了一套健壮的机制保证了在高吞吐量的状况下exactly-once的消费Kafka的数据,它的API的使用与配置也比较简单,同时也便于监控。
barrier能够理解为checkpoint之间的分隔符,在它以前的data属于前一个checkpoint,而在它以后的data属于另外一个checkpoint。同时,barrier会由source(如FlinkKafkaConsumer)发起,并混在数据中,同数据同样传输给下一级的operator,直到sink为止。若是barrier已经被sink收到,那么说明checkpoint已经完成了(这个checkpoint的状态为completed并被存到了state backend中),它以前的数据已经被处理完毕并sink。
Flink异步记录checkpoint的行为是由咱们的来配置的,只有当咱们设置了enableCheckpointing()时,Flink才会在checkpoint完成时(整个job的全部的operator都收到了这个checkpoint的barrier才意味这checkpoint完成,具体参考咱们对Flink checkpoint的介绍)将offset记录起来并提交,这时候才可以保证exactly-once。
Kafka Producer的容错-Kafka 0.9 and 0.10
若是Flink开启了checkpoint,针对FlinkKafkaProducer09和FlinkKafkaProducer010 能够提供 at-least-once的语义,还须要配置下面两个参数:
setLogFailuresOnly(false)
setFlushOnCheckpoint(true)
注意:建议修改kafka 生产者的重试次数retries【这个参数的值默认是0】
复制代码
Kafka Producer的容错-Kafka 0.11,若是Flink开启了checkpoint,针对FlinkKafkaProducer011 就能够提供 exactly-once的语义,可是须要选择具体的语义
具体的语义设置方式
Semantic.NONE
Semantic.AT_LEAST_ONCE【默认】
Semantic.EXACTLY_ONCE
checkpoint配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间
//设置事务超时时间
//prop.setProperty("transaction.timeout.ms",60000*15+"");
//第二种解决方案,设置kafka的最大事务超时时间,主要是kafka的配置文件设置。
//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
//使用仅一次语义的kafkaProducer
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
text.addSink(myProducer);
复制代码
本套技术专栏是做者(秦凯新)平时工做的总结和升华,并深度整理大量网上资源和专业书籍。经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何学术交流,可随时联系。
本文在这里一直困扰我如何Kafka 偏移量管理实现精确一次语义,若是你有幸阅读到本篇内容,且有最好的解决方案,望可以给我留言,期待更好的解决方案。
秦凯新 于浪潮