本次测试为本地单机版的elasticsearch和sparkhtml
配置:spark2.2.0,elasticsearch1.7.2(集群)或者elasticsearch6.6.1(单机版),sdk2.11.1node
pom依赖:mysql
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <!--elasticsearch-dadoop--> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>6.1.1</version> </dependency>
备注:编译环境必定要统一,scala的版本都是不向下兼容的。sql
代码一:RDD读取数据库
val conf = new SparkConf().setMaster("local").setAppName("ScalaSparkElasticSearch") /** * 根据es官网的描述,集成须要设置: * es.index.auto.create--->true * 咱们要去链接es集群,要设置es集群的位置(host, port) */ conf.set("es.index.auto.create", "true") conf.set("es.nodes", "master") //---->若是是链接的远程es节点,该项必需要设置 conf.set("es.port", "9200") val sc = new SparkContext(conf)
val esrdd:RDD[(String, collection.Map[String, AnyRef])]=sc.esRDD("kc22k2_detail/cz")//获取整个索引的数据 esrdd.collect().foreach(s=>print(s._2))
代码二:dataFrame读取apache
val spark=SparkSession.builder().master("local").appName("spark_es").getOrCreate() val options = Map("es.index.auto.create" -> "true", "pushdown" -> "true", "es.nodes" -> "10.111.121.115","es.port" -> "9200") val sparkDF = spark.sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("kc22k2_detail/cz") sparkDF.show(10) sparkDF.createOrReplaceTempView("t_sn_gongshi") spark.sql("select count(distinct name_depa) from t_sn_gongshi").show()
使用sparksql来读取es,能够实现sql操做数据库很方便,但会花费大量的交互时间,日常最好直接使用es的api,与hadoop结合的时候能够使用spark进行交互处理。api
代码三:经过筛选条件读取--减小数据量和网络传输时间。网络
DSL查询参考地址:https://www.yiibai.com/elasticsearch/elasticsearch_query_dsl.htmlsession
val conf = new SparkConf() conf.set("es.index.auto.create", "true") conf.set("pushdown", "true") conf.set("es.nodes", "192.168.x.xxx") conf.set("es.port", "9200") val spark=SparkSession.builder().master("local[*]").appName("spark_es").config(conf).getOrCreate() val query:String = s"""{ "query" : { "terms":{ "name":["北京","成都"] } } }""" val df = EsSparkSQL.esDF(spark.sqlContext,"area_map_0302_3/cz",query)
df.show()
代码三:使用spark处理数据后写入elasticsearchapp
def sparkWriteEs(): Unit ={ val conf = new SparkConf() conf.set("es.index.auto.create", "true") conf.set("pushdown", "true") conf.set("es.nodes", "10.111.121.115") conf.set("es.port", "9200") //sparksession中能够放多个配置文件 val spark = SparkSession.builder() .appName("Spark-ES") .master("local[*]") .config(conf) .enableHiveSupport() .getOrCreate() /** * 写入rdd数据 */ val data:RDD[String]=sc.textFile("file:///C:\\Users\\91BGJK2\\Desktop\\es测试数据.txt") val rdds:RDD[Trip]=data.map { line => val s:Array[String]= line.split("\t") Trip(s(0),s(1),s(2)) } EsSpark.saveToEs(rdds, "kc22k2_test/cz") //val sqlContext=new SQLContext(sc) /** * 从数据源中取---写入datframe数据 */ val gongshi:DataFrame=spark.read.format("jdbc") .option("driver","com.mysql.jdbc.Driver") .option("url","jdbc:mysql://10.111.121.111:3306/test?useUnicode=true&characterEncoding=UTF-8") .option("dbtable","t_test_es") .option("user", "root") .option("password", "root") .load() //dataframe格式数据使用essparksql来传递,rdd使用esspark EsSparkSQL.saveToEs(gongshi, "kc22k2_detail/cz") }