本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。html
<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!-- spark 基础依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- spark-streaming 相关依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- zookeeper 相关依赖 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5-cdh6.0.1</version>
</dependency>
<!-- Spark-ES 相关依赖 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.5.4</version>
</dependency>
<!-- Spark-ES 依赖的 HTTP 传输组件 -->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
复制代码
若是使用 CDH 版本的 Spark,则在调试及实际部署运行的时候会出现下面的错误:java
java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol
复制代码
很显然是缺乏 httpclient 相关依赖形成的,对比开源版本与 CDH 版本的 Spark,发现开源版本多出了 commons-httpclient-3.1.jar
,所以上述 Maven 的 pom 文件添加上对其依赖便可。node
ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件能够将 Hadoop 生态的数据写入到 ES 中,而后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步能够经过 Kibana 来实现数据的可视化。apache
同时,也能够借助 ES 做为数据存储层(相似数仓的 Stage 层或者 ODS 层),而后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。编程
使用 ES 作为原始数据的存储层,能够很好的进行数据去重、数据质量分析,还能够提供一些即时的数据服务,例如趋势展现、汇总分析等。json
ES-Hadoop 是一个整合性质的组件,它封装了 Hadoop 生态的多种组件与 ES 交互的 API,若是你只须要部分功能,可使用细分的组件:bash
es-hadoop 核心是经过 es 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明:服务器
es.nodes
:须要链接的 es 节点(不须要配置所有节点,默认会自动发现其余可用节点);es.port
:节点 http 通信端口;es.nodes.discovery
:默认为 true,表示自动发现集群可用节点;es.nodes.wan.only
:默认为 false,设置为 true 以后,会关闭节点的自动 discovery,只使用 es.nodes
声明的节点进行数据读写操做;若是你须要经过域名进行数据访问,则设置该选项为 true,不然请务必设置为 false;es.index.auto.create
:是否自动建立不存在的索引,默认为 true;es.net.http.auth.user
:Basic 认证的用户名;es.net.http.auth.pass
:Basic 认证的密码。val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
复制代码
特别须要注意的配置项为 es.nodes.wan.only
,因为在云服务器环境中,配置文件使用的通常为内网地址,而本地调试的时候通常使用外网地址,这样将 es.nodes
配置为外网地址后,最后会出现节点找不到的问题(因为会使用节点配置的内网地址去进行链接):restful
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available;
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]
复制代码
此时将 es.nodes.wan.only
设置为 true 便可。推荐开发测试时使用域名,集群部署的时候将该选项置为 false。app
若是数据存在重复,写入 ES 时每每会出现数据写入冲突的错误,此时有两种解决方法。
方法一:设置 es.write.operation
为 upsert,这样达到的效果为若是存在则更新,不存在则进行插入,该配置项默认值为 index。
方法二:自定义冲突处理类,相似上述配置中设置了自定义的 error.handlers
,经过自定义类来处理相关错误,例如忽略冲突等:
public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
if (entry.getResponseCode() == 409) {
StaticLog.warn("Encountered conflict response. Ignoring old data.");
return HandlerResult.HANDLED;
}
return collector.pass("Not a conflict response code.");
}
}
复制代码
方法二能够屏蔽写入版本比预期的小之类的版本冲突问题。
EsSpark 提供了两种主要方法来实现数据写入:
saveToEs
:RDD 内容为 Seq[Map]
,即一个 Map 对象集合,每一个 Map 对应一个文档;saveJsonToEs
:RDD 内容为 Seq[String]
,即一个 String 集合,每一个 String 是一个 JSON 字符串,表明一条记录(对应 ES 的 _source)。数据写入能够指定不少配置信息,例如:
es.resource
:设置写入的索引和类型,索引和类型名均支持动态变量;es.mapping.id
:设置文档 _id 对应的字段名;es.mapping.exclude
:设置写入时忽略的字段,支持通配符。val itemRdd = rdd.flatMap(line => {
val topic = line.topic()
println("正在处理:" + topic + " - " + line.partition() + " : " + line.offset())
val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
val resultMap = jsonArray.map(jsonObj =>{
var tmpId = "xxx"
var tmpIndex = "xxxxxx"
jsonObj.put("myTmpId", tmpId)
jsonObj.put("myTmpIndex", tmpIndex)
jsonObj.getInnerMap
})
resultMap
})
val mapConf = Map(
("es.resource" , "{myTmpIndex}/doc"),
("es.write.operation" , "upsert"),
("es.mapping.id" , "myTmpId"),
("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)
复制代码
es.mapping.exclude
只支持 RDD 为 Map 集合(saveToEs),当为 Json 字符串集合时(saveJsonToEs)会提示不支持的错误信息;这个配置项很是有用,例如 myTmpId 做为文档 id,所以没有必要重复存储到 _source 里面了,能够配置到这个配置项,将其从 _source 中排除。
Any Code,Code Any!
扫码关注『AnyCode』,编程路上,一块儿前行。