本文主要介绍spark sql读写es、structured streaming写入es以及一些参数的配置html
ES官方提供了对spark的支持,能够直接经过spark读写es,具体能够参考ES Spark Support文档(文末有地址)。node
如下是pom依赖,具体版本能够根据本身的es和spark版本进行选择:sql
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.0.0</version> </dependency>
主要提供了两种读写方式:一种是经过DataFrameReader/Writer传入ES Source实现;另外一种是直接读写DataFrame实现。在实现前,还要列一些相关的配置:bootstrap
参数 | 描述 |
---|---|
es.nodes.wan.only | true or false,在此模式下,链接器禁用发现,而且全部操做经过声明的es.nodes链接 |
es.nodes | ES节点 |
es.port | ES端口 |
es.index.auto.create | true or false,是否自动建立index |
es.resource | 资源路径 |
es.mapping.id | es会为每一个文档分配一个全局id。若是不指定此参数将随机生成;若是指定的话按指定的来 |
es.batch.size.bytes | es批量API的批量写入的大小(以字节为单位) |
es.batch.write.refresh | 批量更新完成后是否调用索引刷新 |
es.read.field.as.array.include | 读es的时候,指定将哪些字段做为数组类型 |
列了一些经常使用的配置,更多配置查看ES Spark Configuration文档数组
import org.elasticsearch.spark.sql._ val options = Map( "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200", "es.read.field.as.array.include" -> "arr1, arr2" ) val df = spark .read .format("es") .options(options) .load("index1/info") df.show()
import org.elasticsearch.spark.sql._ val options = Map( "es.index.auto.create" -> "true", "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200", "es.mapping.id" -> "id" ) val sourceDF = spark.table("hive_table") sourceDF .write .format("org.elasticsearch.spark.sql") .options(options) .mode(SaveMode.Append) .save("hive_table/docs")
jar包中提供了esDF()方法能够直接读es数据为DataFrame,如下是源码截图。
简单说一下各个参数:app
resource:资源路径,例如hive_table/docselasticsearch
cfg:一些es的配置,和上面代码中的options差很少ide
query:指定DSL查询语句来过滤要读的数据,例如"?q=user_group_id:3"表示读user_group_id为3的数据oop
val options = Map( "pushdown" -> "true", "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200" ) val df = spark.esDF("hive_table/docs", "?q=user_group_id:3", options) df.show()
jar包中提供了saveToEs()方法能够将DataFrame写入ES,如下是源码截图。
resource:资源路径,例如hive_table/docsui
cfg:一些es的配置,和上面代码中的options差很少
import org.elasticsearch.spark.sql._ val options = Map( "es.index.auto.create" -> "true", "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200", "es.mapping.id" -> "zip_record_id" ) val df = spark.table("hive_table") df.saveToEs("hive_table/docs", options)
es也提供了对Structured Streaming的集成,使用Structured Streaming能够实时的写入ES。
import org.elasticsearch.spark.sql._ val options = Map( "es.index.auto.create" -> "true", "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200", "es.mapping.id" -> "zip_record_id" ) val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "a:9092,b:9092,c:9092") .option("subscribe", "test") .option("failOnDataLoss", "false") .load() df .writeStream .outputMode(OutputMode.Append()) .format("es") .option("checkpointLocation", s"hdfs://hadoop:8020/checkpoint/test01") .options(options) .start("test_streaming/docs") .awaitTermination()
报错信息:type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
由于es的mapping只会记录字段的类型,不会记录是不是数组,也就是说若是是int数组,es的mapping只是记录成int。
能够在option中加一个es.read.field.as.array.include,标明数组字段
es.read.field.as.array.include" -> "数组字段的名字"
若是是object里的某个字段,写成"object名字.数组字段名字",若是是多个字段,字段名之间用逗号分隔
DataFrame的Timestamp类型数据写入ES后,就变成了Number类型。
这可能不算个问题,时间戳本质上就是Long类型的毫秒值;可是在Hive中Timestamp是"yyyy-MM-dd HH:mm:ss"的类型,我的以为很别扭。
尝试将Timestamp类型字段转成Date类型,写入ES后仍是Number类型。网上搜了一圈也没有什么好的办法,你们有什么解决办法欢迎交流。
ES Spark Support文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark
ES Spark Configuration: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
end.
我的公众号:码农峰,定时推送行业资讯,持续发布原创技术文章,欢迎你们关注。