spark操做elasticsearch

本次测试为本地单机版的elasticsearch和sparkhtml

配置:spark2.2.0,elasticsearch1.7.2(集群)或者elasticsearch6.6.1(单机版),sdk2.11.1node

pom依赖:mysql

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

<!--elasticsearch-dadoop-->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>6.1.1</version>
</dependency>

备注:编译环境必定要统一,scala的版本都是不向下兼容的。sql

代码一:RDD读取数据库

val conf = new SparkConf().setMaster("local").setAppName("ScalaSparkElasticSearch")
/**
  * 根据es官网的描述,集成须要设置:
  * es.index.auto.create--->true
  * 咱们要去链接es集群,要设置es集群的位置(host, port)
  */
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "master")
//---->若是是链接的远程es节点,该项必需要设置
conf.set("es.port", "9200")
val sc = new SparkContext(conf)
val esrdd:RDD[(String, collection.Map[String, AnyRef])]=sc.esRDD("kc22k2_detail/cz")//获取整个索引的数据
esrdd.collect().foreach(s=>print(s._2))

代码二:dataFrame读取apache

val spark=SparkSession.builder().master("local").appName("spark_es").getOrCreate()
val options = Map("es.index.auto.create" -> "true", "pushdown" -> "true", "es.nodes" -> "10.111.121.115","es.port" -> "9200")
val sparkDF = spark.sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("kc22k2_detail/cz")
sparkDF.show(10)
sparkDF.createOrReplaceTempView("t_sn_gongshi")
spark.sql("select count(distinct name_depa) from t_sn_gongshi").show()

使用sparksql来读取es,能够实现sql操做数据库很方便,但会花费大量的交互时间,日常最好直接使用es的api,与hadoop结合的时候能够使用spark进行交互处理。api

代码三:经过筛选条件读取--减小数据量和网络传输时间。网络

DSL查询参考地址:https://www.yiibai.com/elasticsearch/elasticsearch_query_dsl.htmlsession

val conf = new SparkConf()
conf.set("es.index.auto.create", "true")
conf.set("pushdown", "true")
conf.set("es.nodes", "192.168.x.xxx")
conf.set("es.port", "9200")
val spark=SparkSession.builder().master("local[*]").appName("spark_es").config(conf).getOrCreate()
val query:String =
s"""{
        "query" : {
            "terms":{
              "name":["北京","成都"]
            }

        }
}"""
val df = EsSparkSQL.esDF(spark.sqlContext,"area_map_0302_3/cz",query)
df.show()

代码三:使用spark处理数据后写入elasticsearchapp

def sparkWriteEs(): Unit ={
    val conf = new SparkConf()
    conf.set("es.index.auto.create", "true")
    conf.set("pushdown", "true")
    conf.set("es.nodes", "10.111.121.115")
    conf.set("es.port", "9200")
    //sparksession中能够放多个配置文件
    val spark = SparkSession.builder()
      .appName("Spark-ES")
      .master("local[*]")
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
    /**
      * 写入rdd数据
      */
    val data:RDD[String]=sc.textFile("file:///C:\\Users\\91BGJK2\\Desktop\\es测试数据.txt")
    val rdds:RDD[Trip]=data.map { line =>
     val s:Array[String]= line.split("\t")
      Trip(s(0),s(1),s(2))
    }
    EsSpark.saveToEs(rdds, "kc22k2_test/cz")

    //val sqlContext=new SQLContext(sc)
    /**
      * 从数据源中取---写入datframe数据
      */
    val gongshi:DataFrame=spark.read.format("jdbc")
      .option("driver","com.mysql.jdbc.Driver")
      .option("url","jdbc:mysql://10.111.121.111:3306/test?useUnicode=true&characterEncoding=UTF-8")
      .option("dbtable","t_test_es")
      .option("user", "root")
      .option("password", "root")
      .load()
    //dataframe格式数据使用essparksql来传递,rdd使用esspark
    EsSparkSQL.saveToEs(gongshi, "kc22k2_detail/cz")
  }
相关文章
相关标签/搜索