浪院长 浪尖聊大数据 node
ES也是比较火热,在日志数据分析,规则分析等确实很方便,说实话用es stack 浪尖以为能够解决不少公司的数据分析需求。极客时间下周一要上线新的ES课程,有须要的暂时别购买,到时候还找浪尖返现吧。git
写这篇文章的缘由是前两天星球球友去面试,面试管问了一下,Spark 分析ES的数据,生成的RDD分区数跟什么有关系呢?github
稍微猜想一下就能想到跟分片数有关,可是具体是什么关系呢?面试
可想的具体关系多是如下两种:apache
1).就像KafkaRDD的分区与kafka topic分区数的关系同样,一对一。json
2).ES支持游标查询,那么是否是也能够对比较大的分片进行拆分红多个RDD分区呢?app
那么下面浪尖带着你们翻一下源码看看具体状况。elasticsearch
ES官网直接提供的有elasticsearch-hadoop 插件,对于ES 7.x,hadoop和Spark版本支持以下:ide
hadoop2Version = 2.7.1 hadoop22Version = 2.2.0 spark13Version = 1.6.2 spark20Version = 2.3.0
浪尖这了采用的ES版本是7.1.1,测试用的Spark版本是2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:函数
a,导入整个elasticsearch-hadoop包
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>7.1.1</version> </dependency>
b,只导入spark模块的包
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>7.1.1</version> </dependency>
浪尖这里为了测试方便,只是在本机起了一个单节点的ES实例,简单的测试代码以下:
import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.hadoop.cfg.ConfigurationOptions object es2sparkrdd { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName) conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1") conf.set(ConfigurationOptions.ES_PORT, "9200") conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true") conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true") conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false") // conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser) // conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd) conf.set("es.write.rest.error.handlers", "ignoreConflict") conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler") val sc = new SparkContext(conf) import org.elasticsearch.spark._ sc.esRDD("posts").foreach(each=>{ each._2.keys.foreach(println) }) sc.esJsonRDD("posts").foreach(each=>{ println(each._2) }) sc.stop() } }
能够看到Spark Core读取RDD主要有两种形式的API:
a,esRDD。这种返回的是一个tuple2的类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。
RDD[(String, Map[String, AnyRef])]
b,esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。
RDD[(String, String)]
虽然是两种类型的RDD,可是RDD都是ScalaEsRDD类型。
要分析Spark Core读取ES的并行度,只须要分析ScalaEsRDD的getPartitions函数便可。
首先导入源码https://github.com/elastic/elasticsearch-hadoop这个是gradle工程,能够直接导入idea,而后切换到7.x版本便可。
废话少说直接找到ScalaEsRDD,发现gePartitions是在其父类实现的,方法内容以下:
override def getPartitions: Array[Partition] = { esPartitions.zipWithIndex.map { case(esPartition, idx) => new EsPartition(id, idx, esPartition) }.toArray }
esPartitions是一个lazy型的变量:
@transient private[spark] lazy val esPartitions = { RestService.findPartitions(esCfg, logger) }
这种声明缘由是什么呢?
lazy+transient的缘由你们能够考虑一下。
RestService.findPartitions方法也是仅是建立客户端获取分片等信息,而后调用,分两种状况调用两个方法。
final List<PartitionDefinition> partitions; // 5.x及之后版本 同时没有配置es.input.max.docs.per.partition if (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) { partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log); } else { partitions = findShardPartitions(settings, mapping, nodesMap, shards, log); }
a).findSlicePartitions
这个方法其实就是在5.x及之后的ES版本,同时配置了
es.input.max.docs.per.partition
之后,才会执行,实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,而后计算出拆分的分区数,最后生成分区信息。具体代码以下:
long numDocs; if (readResource.isTyped()) { numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query); } else { numDocs = client.countIndexShard(index, Integer.toString(shardId), query); } int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition); for (int i = 0; i < numPartitions; i++) { PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions); partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations)); }
实际上分片就是用游标的方式,对_doc进行排序,而后按照分片计算获得的分区偏移进行数据的读取,组装过程是SearchRequestBuilder.assemble方法来实现的。
b).findShardPartitions方法
这个方法没啥疑问了就是一个RDD分区对应于ES index的一个分片。
PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId, locationList.toArray(new String[0])); partitions.add(partition);
以上就是Spark Core读取ES数据的时候分片和RDD分区的对应关系分析,默认状况下是一个es 索引分片对应Spark RDD的一个分区。假如分片数过大,且ES版本在5.x及以上,能够配置参数
es.input.max.docs.per.partition
进行拆分。