Spark读写ES

本文主要介绍spark sql读写es、structured streaming写入es以及一些参数的配置html

ES官方提供了对spark的支持,能够直接经过spark读写es,具体能够参考ES Spark Support文档(文末有地址)。node

如下是pom依赖,具体版本能够根据本身的es和spark版本进行选择:sql

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-20_2.11</artifactId>
  <version>6.0.0</version>
</dependency>

Spark SQL - ES

主要提供了两种读写方式:一种是经过DataFrameReader/Writer传入ES Source实现;另外一种是直接读写DataFrame实现。在实现前,还要列一些相关的配置:bootstrap

配置

参数 描述
es.nodes.wan.only true or false,在此模式下,链接器禁用发现,而且全部操做经过声明的es.nodes链接
es.nodes ES节点
es.port ES端口
es.index.auto.create true or false,是否自动建立index
es.resource 资源路径
es.mapping.id es会为每一个文档分配一个全局id。若是不指定此参数将随机生成;若是指定的话按指定的来
es.batch.size.bytes es批量API的批量写入的大小(以字节为单位)
es.batch.write.refresh 批量更新完成后是否调用索引刷新
es.read.field.as.array.include 读es的时候,指定将哪些字段做为数组类型

列了一些经常使用的配置,更多配置查看ES Spark Configuration文档数组

DataFrameReader读ES

import org.elasticsearch.spark.sql._
val options = Map(
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.read.field.as.array.include" -> "arr1, arr2"
)
val df = spark
    .read
    .format("es")
    .options(options)
    .load("index1/info")
df.show()

DataFrameWriter写ES

import org.elasticsearch.spark.sql._
val options = Map(
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.mapping.id" -> "id"
)

val sourceDF = spark.table("hive_table")
sourceDF
  .write
  .format("org.elasticsearch.spark.sql")
  .options(options)
  .mode(SaveMode.Append)
  .save("hive_table/docs")

读DataFrame

jar包中提供了esDF()方法能够直接读es数据为DataFrame,如下是源码截图。
在这里插入图片描述
简单说一下各个参数:app

resource:资源路径,例如hive_table/docselasticsearch

cfg:一些es的配置,和上面代码中的options差很少ide

query:指定DSL查询语句来过滤要读的数据,例如"?q=user_group_id:3"表示读user_group_id为3的数据oop

val options = Map(
  "pushdown" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200"
)

val df = spark.esDF("hive_table/docs", "?q=user_group_id:3", options)
df.show()

写DataFrame

jar包中提供了saveToEs()方法能够将DataFrame写入ES,如下是源码截图。
在这里插入图片描述
resource:资源路径,例如hive_table/docsui

cfg:一些es的配置,和上面代码中的options差很少

import org.elasticsearch.spark.sql._ 
val options = Map(
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.mapping.id" -> "zip_record_id"
)
val df = spark.table("hive_table")
df.saveToEs("hive_table/docs", options)

Structured Streaming - ES

es也提供了对Structured Streaming的集成,使用Structured Streaming能够实时的写入ES。

import org.elasticsearch.spark.sql._
val options = Map(
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.mapping.id" -> "zip_record_id"
)
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "a:9092,b:9092,c:9092")
  .option("subscribe", "test")
  .option("failOnDataLoss", "false")
  .load()
df
  .writeStream
  .outputMode(OutputMode.Append())
  .format("es")
  .option("checkpointLocation", s"hdfs://hadoop:8020/checkpoint/test01")
  .options(options)
  .start("test_streaming/docs")
  .awaitTermination()

可能遇到的问题

数组类型转换错误

报错信息:type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type

由于es的mapping只会记录字段的类型,不会记录是不是数组,也就是说若是是int数组,es的mapping只是记录成int。

能够在option中加一个es.read.field.as.array.include,标明数组字段

es.read.field.as.array.include" -> "数组字段的名字"

若是是object里的某个字段,写成"object名字.数组字段名字",若是是多个字段,字段名之间用逗号分隔

Timestamp被转为Long

DataFrame的Timestamp类型数据写入ES后,就变成了Number类型。

这可能不算个问题,时间戳本质上就是Long类型的毫秒值;可是在Hive中Timestamp是"yyyy-MM-dd HH:mm:ss"的类型,我的以为很别扭。

尝试将Timestamp类型字段转成Date类型,写入ES后仍是Number类型。网上搜了一圈也没有什么好的办法,你们有什么解决办法欢迎交流。

References

ES Spark Support文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark

ES Spark Configuration: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

end.


我的公众号:码农峰,定时推送行业资讯,持续发布原创技术文章,欢迎你们关注。

相关文章
相关标签/搜索