消费者须要本身保留一个offset,从kafka 获取消息时,只拉去当前offset 之后的消息。 kafka offset的管理方式分为两种保存offset和不保存offset,通常保存offset采用的是外部存储保护,这都要根据具体的业务状况来定。使用外部存储保存,咱们可把offset保存到Checkpoint, Hbase, Zookeeper, Kafka,接下来咱们就来offset保存的方式,各类方式使用的场景,关于至少一次语义,最多一次语义以及一次仅一次语义的一些相关概念,以及解决至少一次语义存在的数据重复和之多一次语义存在的数据丢失问题的方法。
Kafka Offset管理-Checkpoint
一、启用Spark Streaming的checkpoint是存储偏移量最简单的方法mysql
二、流式checkpoint专门用于保存应用程序的状态,好比保存在HDFS上,在故障时能恢复sql
三、Spark Streaming的checkpoint没法跨越应用程序进行恢复(这个机器上保存的offset,想在另一台机器上恢复这个offset)服务器
四、Spark升级也将致使没法恢复(升级(API、版本迭代、逻辑修改)后会致使之前的checkpoint没法使用)socket
五、在关键生产应用,不建议使用spark检查点管理offsetspa
receiver会自动将offset维护到zookeeper中。这里的主要讲的是用Direct方式,手动地将offset的值维护到zookeeper中。设计
一、路径 val zkPath = s"{kafkaOffsetRootPath}/{groupName}/{o.partition}/{o.partition}"对象
二、若是Zookeeper中未保存offset,根据kafkaParam的配置使用最新或者最旧的offsetblog
三、若是Zookeeper中有保存offset,咱们会利用这个offset做为kafka的起始位置接口
一、基于Hbase的通用设计,使用同一张表保存能够跨越多个spark Streaming程序的topic的offset事务
二、rowkey = topic名称 + groupID + Streaming的batchtime.milliSeconds.尽管batchtime.milliSeconds不是必须的,可是它能够看到历史的批处理任务对offset的管理状况。
三、kafka的offset保存在下面的表中,列簇为offsets, 30天后自动过时Hbase表结构 create spark_kafka_offsets,{NAME => offset, TTL => 2592000}
四、offset的获取场景
场景1:Streaming做业首次启动。经过zookeeper来查找给定topic中分区的数量,而后返回“0”做为全部topic分区的offset
场景2:长时间运行的Streaming做业已经中止,新的分区被添加到kafka的topic中。经过zookeeper来查找给定topic中分区数量,对于全部旧的topic分区,将offset设置为Hbase中的最新偏移量。对于全部新的topic分区,它将返回“0”做为offset
场景3:长时间运行的Streaming做业已经中止,topic的分区没有任何更改。在这种状况下,Hbase中发现的最新偏移量做为每一个topic分区的offset返回。
Spark Streaming消费数据反写kafka
实现流程:
一、flume将socket流数据采集到kafka
二、Streaming读取kafka的数据进行清洗
三、将清洗后的数据再次写到kafka
一、将kafkaProducer对象广播到全部executor节点,这样就能够在每一个executor节点将数据插入到kafka
二、用partition的方式,一个rdd的partition对应一个kafkaProducer
kafka的保存offset过时问题(也称offset越界问题)
缘由:segment过时致使offset在实际数据的offset以前(segment过时致使数据不存在了,可是在其余地方还存在offset,当再次消费这个数据取出offset的时候就会出现数据找不到问题)
解决方法:实现手动解决offset越界问题,须要把kafkaCluster类的内容拿过来,而且把包访问权限去掉,具体实现查看MyCluster
数据峰值期间如何限速
场景:Streaming宕机一段时间或数据峰值期间都会形成kafka数据积压,若是不对Streaming的批次间隔作限速处理,在批次数据中会拉取不少的数据,这样会影响处理效率。
解决办法:进行限速。限速参数:spark.streaming.kafka.maxRatePartition 每秒每一个分区获取的记录数
kafka的消息传递语义
消息传递语义有:至少一次语义(at-least-once)、最多一次语义(at-most-once)、一次仅一次语义(exactly-once)。其中at-least-once和at-most-once,它们的使用会存在数据重复和数据丢失问题,可能出现这种状况的缘由图解以下:
因为,至少一次语义会致使数据重复,最多一次语义会致使数据的丢失,因此提出了一次仅一次语义,就能够很好地解决了数据重复和数据丢失问题。那么怎么来实现一次仅一次语义?小编总结总结以下:
一、幂等写入:当获取到数据后,先写到MySQL,再保存offset,若是在写到MySQL数据后,在保存offset以前宕机,重启做业后也不会影响一次语义,由于会在MySQL重复更新须要设置好惟一的主键。好比Redis、MySQL,再好比每次往一个目录覆写数据,这样主键不容易获取
注:在软件开发领域,幂等写入即为一样的请求被执行一次与连续执行屡次的效果是同样的,服务器的状态也是同样的,实际上就是接口的可重复调用(包括时间和空间的两个维度)
二、事物控制:保证数据和offset在同一个事务里面,好比用mysql这样须要事务存储的支持。
三、自定义实现:offset和数据绑定保存等
在offset的管理当中,使用checkepoint的方式来管理offset简单易实现,可是若是进行程序迭代后其余应用程序是获取不到的,所以在实际生产环境中若是Streaming流式处理的复杂度不高,处理的数据比较小可使用checkpoint来进行offset的管理。通常状况下,推荐使用zookeeper来进行offset的管理,尽管使用起来比checkpoint复杂,可是这种方式适用于比较复杂的Streaming,当机器升级或者应用程序改变时,其余程序任然能够获取到zookeeper中的offset值。
更新时间
第一次更新时间:2018-12-09 增长了总结