本文来自OPPO互联网技术团队,是《剖析Spark数据分区》系列文章的第三篇,本篇咱们将分析Spark streaming,TiSpark中的数据分区。segmentfault
系列一:剖析Spark数据分区之Hadoop分片缓存
系列二:剖析Spark数据分区之Spark RDD分区数据结构
系列三:剖析Spark数据分区之Spark streaming & TiSpark分区架构
Spark Streaming从Kafka接收数据,转换为Spark Streaming中的数据结构DStream即离散数据流。并发
数据接收方式有两种:负载均衡
当前spark已经不支持该模式。分布式
receiver模式的并行度由spark.streaming.blockInterval决定,默认是200ms。oop
receiver模式接收block.batch数据后会封装到RDD中,这里的block对应RDD中的partition。源码分析
batchInterval必定的状况下:性能
Spark会建立跟Kafka partition同样多的RDD partition,而且会并行从Kafka中读取数据。因此在Kafka partition和RDD partition之间,有一个一对一的映射关系。
DirectKafkaInputDStream按期生成的RDD的类型是KafkaRDD。
咱们首先看看 KafkaRDD是如何划分分区的:
它会根据从初始化时接收的offset信息参数,生成KafkaRDDPartition分区;每一个分区对应着Kafka的一个topic partition 的一段数据,这段数据的信息OffsetRange表示, 它保存了数据的位置。
下面咱们详细分析DirectKafkaInputDStream的compute方法:
经过源码分析可知:Partition的计算方法是为topic的每个partition建立一个OffsetRange,全部的OffsetRange生成一个KafkaRDD。
下面咱们分析KafkaRDD的getPartitions方法:
每一个OffsetRange生成一个Partition。
如何增大RDD的分区数,让每一个partition处理的数据量增大?
经过源码分析,可经过调小Kafka消息中Topic的分区数目;想要增长RDD的并行度,可经过调大Kafka消息中Topic的分区数目。
TiDB集群主要分为3个组件:
TiDB Server负责接收SQL请求,处理SQL相关的逻辑,并经过PD找到存储计算所须要的TiKV地址,与TiKV交互获取数据,最终返回结构;
TiDB Server并不存储数据,只负责计算,能够无限水平扩展,能够经过负载均衡组件如LVS, HAProxy,F5等对外提供的接入地址。
TiKV负责数据存储,从外部看是一个分布式的提供事物的Key-Value存储引擎。
存储数据的基本单位是Region,每一个Region负责存储一个Key Range (从StartKey到EndKey的左闭右开区间) 区间的数据,每一个TiKV节点会负责多个Region,数据在多个TiKV之间的负载均衡由PD调度,也是以Region为单位进行调度。
TiDB 的数据分布是以 Region 为单位的。一个 Region 包含了一个范围内的数据,一般是 96MB 的大小,Region 的 meta 信息包含了 StartKey 和 EndKey 这两个属性。
当某个 key >= StartKey && key < EndKey 的时候:咱们就知道了这个 key 所在的 Region,而后咱们就能够经过查找该 Region 所在的 TiKV 地址,去这个地址读取这个 key 的数据。
获取 key 所在的 Region, 是经过向 PD 发送请求完成的。
GetRegion(ctx context.Context, key []byte) (metapb.Region, metapb.Peer, error)
经过调用这个接口,咱们就能够定位这个 key 所在的 Region 了。
若是须要获取一个范围内的多个 Region:咱们会从这个范围的 StartKey 开始,屡次调用 GetRegion 这个接口,每次返回的 Region 的 EndKey 作为下次请求的 StartKey,直到返回的 Region 的 EndKey 大于请求范围的 EndKey。
以上执行过程有一个很明显的问题:就是咱们每次读取数据的时候,都须要先去访问 PD,这样会给 PD 带来巨大压力,同时影响请求的性能。
为了解决这个问题:tikv-client 实现了一个 RegionCache 的组件,缓存 Region 信息。
当须要定位 key 所在的 Region 的时候:若是 RegionCache 命中,就不须要访问 PD 了。
RegionCache 内部有两种数据结构保存 Region信息:
严格来讲:PD 上保存的 Region 信息,也是一层 cache;真正最新的 Region 信息是存储在 tikv-server 上的,每一个 tikv-server 会本身决定何时进行 Region 分裂。
在 Region 变化的时候,把信息上报给 PD,PD 用上报上来的 Region 信息,知足 tidb-server 的查询需求。
当咱们从 cache 获取了 Region 信息,并发送请求之后, tikv-server 会对 Region 信息进行校验,确保请求的 Region 信息是正确的。
若是由于 Region 分裂,Region 迁移致使了 Region 信息变化。请求的 Region 信息就会过时,这时 tikv-server 就会返回 Region 错误。
遇到了 Region 错误,咱们就须要清理 RegionCache,从新获取最新的 Region 信息,并从新发送请求。
TiSpark深度整合了Spark Catalyst引擎,使得Spark可以高效的读取TiKV中存储的数据进行分布式计算。
下面分析TiRDD中的getPartitions方法:
经过源码分析:首先经过splitRangeByRegion获取keyWithRegionTasks, 对于每个RegionTask建立一个TiPartition。
可见TiSpark的Partition分区数目与TiKV的region数目一致,若是想要提升TiSpark任务的并行度,可修改以下两个参数值:
经过以上种种状况的分析,只要咱们能正确的认识在各类场景下分区与task的关系,进而加以实际的对影响分区的参数调优,也可让数据量大的任务也能快起来,同时能清楚的解答数据分析师的问题。