初次尝试用 Spark+scala 完成项目的重构,因为二者以前都没接触过,因此边学边用的过程大多艰难。首先面临的是如何快速上手,而后是代码调优、性能调优。本章主要记录本身在项目中遇到的问题以及解决方式,下篇会尝试调优方法。末尾会分享本身的学习资料,也供大多菜鸟第一次使用做为参考。因为本身项目中大量使用spark sql,因此下面的经验大可能是和spark sql有关。一样下面也列出做为菜鸟在学习过程当中的困惑以及踩的坑,还请大牛勿笑 ~_~ 若是有更好的方式解决,欢迎留言,一块儿学习。html
例1:配置文件中有n个sql语句,每一个sql以分号----分隔。你须要读取sql,分别从hdfs中拉取数据。可能会采起:java
//conf_sql_map_file 是sql配置文件 val sql_rdd = sc.textFile(conf_sql_map_file) var sqls = sql_rdd.collect().mkString(" ").split("----")
分析:因为rdd以每行为单位,自动去掉结尾的 换行符。但sql配置文件须要以指定分隔符分隔,而不是每行。因此使用 mkString(" ") 将读取的每行数据以空格分隔,整合为一个长字符串,最后以分隔符分隔。node
但若是 sql 语句中有使用 with 之类的关键词时,上面那种方式读取配置文件会由于格式问题会出错,with语句须要和 select 语句空行分隔,为保险起见,以 “\n” 分隔,还原配置文件的原始格式。python
var sqls = sql_rdd.collect().mkString("\n").split("----")
例2:文件file1内容以下es6
key1,value1sql
key2,value2shell
var file_rdd = sc.textFile(file1).map(e=> (e.split(',')(0),e.split(',')(1))).collectAsMap
或者 不从文件读取,直接使用List类型数据演示apache
scala> var line_rdd = sc.parallelize(List[String]("k,v","key,value")).map(e=>(e.split(',')(0),e.split(',')(1))).collectAsMap
line_rdd: scala.collection.Map[String,String] = Map(k -> v, key -> value)
分析:collectAsMap 是行动操做的一种,能够将数据类型转换为Map类型,而collect是直接转为Array类型。json
scala> import org.apache.spark.{SparkConf, SparkContext} scala> import org.apache.spark.sql.SparkSession scala> val conf = new SparkConf().setAppName("graph_spark@zky") //设置本程序名称 scala> val hiveCtx: SparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
//使用rdd函数转换格式 scala> var sql_file_result = hiveCtx.sql("select * from city limit 10").rdd
scala> sql_file_result
res10: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1187] at rdd at <console>:29
scala> sql_file_result.first
res11: org.apache.spark.sql.Row = [110000,北京市,110000,1,-911,2015-10-10 12:09:47,-911,2018-01-09 18:27:28,20181001000000]
分析:因为spark2.0版本丢弃了SQLContext(HiveContext),取而代之的是SparkSession。hdfs拉取的数据格式为 org.apache.spark.sql.Row,须要调用mkString("\t") 对其转换为String类型的rdd ,而后再转换为其余类型。api
但当你的数据以制表符分隔,就像下面代码里同样,末尾字段值若是存在字符串""空时,建议在首尾加上 [ ] 标识符,由于制表符和末尾的空值都会被rdd 自动过滤掉。另外,不建议分隔符使用制表符分隔,在选用分隔符时确保数据中不会出现你指定的分隔符。
scala> var lines = sql_file_result.map(line => "["+line.mkString("\t")+"]") lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1189] at map at <console>:33 scala> lines.collect res14: Array[String] = Array([110000 北京市 110000 1 -911 2015-10-10 12:09:47 -911 2018-01-09 18:27:28 20180123000000], [120000 天津市 120000 1 -911 2015-10-10 12:09:47 -911 2018-01-09 18:27:28 20180123000000],。。。
解析带[ ]的字符串转成list格式,split()函数中的-1是为确保空值不被过滤。
scala> var items = lines.map(line => line.substring(1,line.length-1).split("\t",-1).toList) items: org.apache.spark.rdd.RDD[List[String]] = MapPartitionsRDD[1190] at map at <console>:35 scala> items.collect res15: Array[List[String]] = Array(List(370101, 济南市, 370000, 1, -911, 1000-01-01 00:00:00, -911, 1000-01-01 00:00:00, 20180916000000), List(110000, 北京市, 110000, 1, -911, 2015-10-10 12:09:47, -911, 2018-01-09 18:27:28, 20180916000000),
scala> var mid_data_rdd = hiveCtx.sql("select city_code,city_name from city limit 10").rdd scala> mid_data_rdd.collect res16: Array[org.apache.spark.sql.Row] = Array([110000,北京市], [120000,天津市], [130100,石家庄市], [130200,唐山市], [130300,秦皇岛市], [130400,邯郸市], [130500,邢台市], [130600,保定市], [130700,张家口市], [130800,承德市]) scala> var mid_data_map = mid_data_rdd.map(x => (x(0)->x(1).toString)).collectAsMap mid_data_map: scala.collection.Map[Any,String] = Map(110000 -> 北京市, 130100 -> 石家庄市, 130300 -> 秦皇岛市, 120000 -> 天津市, 130500 -> 邢台市, 130700 -> 张家口市, 130200 -> 唐山市, 130400 -> 邯郸市, 130600 -> 保定市, 130800 -> 承德市) scala> var mid_data_map = mid_data_rdd.map(x => (x(0).toString->x(1).toString)).collectAsMap mid_data_map: scala.collection.Map[String,String] = Map(130300 -> 秦皇岛市, 130600 -> 保定市, 130500 -> 邢台市, 130800 -> 承德市, 130200 -> 唐山市, 110000 -> 北京市, 130400 -> 邯郸市, 130700 -> 张家口市, 130100 -> 石家庄市, 120000 -> 天津市)
//若是想转换为array数组,试一下collect~
scala> var mid_data_map = mid_data_rdd.map(x => (x(0).toString->x(1).toString)).collect mid_data_map: Array[(String, String)] = Array((110000,北京市), (120000,天津市), (130100,石家庄市), (130200,唐山市), (130300,秦皇岛市), (130400,邯郸市), (130500,邢台市), (130600,保定市), (130700,张家口市), (130800,承德市))
分析:能够关注下 toString函数~
scala> val people = sc.parallelize(List(("1","mary"),("2","rose"),("3","jack"))) people: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[1] at parallelize at <console>:24 scala> case class Person(id:String,name:String) defined class Person scala> var people_trans = people.map(item => Person(item._1,item._2)) people_trans: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[2] at map at <console>:28 scala> val people_frame = people_trans.toDF() people_frame: org.apache.spark.sql.DataFrame = [id: string, name: string] scala> people_frame.createOrReplaceTempView("person") scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> import org.apache.spark.SparkConf import org.apache.spark.SparkConf scala> val conf = new SparkConf().setAppName("graph_spark@zhengkaiyu") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@534df4b scala> val hiveCtx: SparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate() 18/11/19 21:47:11 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect. hiveCtx: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@18de437d scala> import hiveCtx.sql import hiveCtx.sql scala> import hiveCtx.implicits._ import hiveCtx.implicits._ scala> sql("select * from person").collect res6: Array[org.apache.spark.sql.Row] = Array([1,mary], [2,rose], [3,jack])
scala> sql("insert into 库名.hive表名 select * from person")
分析:此例是基于case class来建立SchemaRDD,经过写入临时表,最后再插入到hive表中。除了这种方式还能够基于json格式来建临时表,见下例。其中spark2.1创建临时表时,将registerTempTable() 改成createOrReplaceTempView(),注意版本,要不会引发没必要要的麻烦。
scala> import org.apache.spark.sql.SparkSession scala> import org.apache.spark.SparkConf scala> val conf = new SparkConf().setAppName("graph_spark@zhengkaiyu") scala> val spark: SparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate() scala> val df = spark.read.json("examples/src/main/resources/people.json") scala> df.show() scala> df.createOrReplaceTempView("people") scala> val sqlDF = spark.sql("SELECT * FROM people") scala> sqlDF.show()
(1)当启动交互环境 spark-shell 时,会出现较为诡异的事情,刚定义好的变量会被以前的同名变量所覆盖,猜测缘由多是内存不足致使。
(2)在scala代码中,建议if-else语句格式规范书写,不然会编译不正确。
if(条件){
}
(3)启动 spark-shell 时,注意指定的模式local、yarn。
不可序列化:org.apache.spark.SparkException: Task not serializable
解决方案1:继承java可序列化类
object Process extends java.io.Serializable{ }
经过从临时表中读取数据写入hive表时,会遇到错误:org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
解决方案:执行下面语句后再执行insert语句。
//val spark: SparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
spark.sql("SET hive.exec.dynamic.partition = true")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
《Spark快速大数据分析》王道远译,推荐理由:快速上手,实例代码有python、scala、java三种语言
《快学scala》
https://spark.apache.org/docs/2.1.0/sql-programming-guide.html#datasets-and-dataframes
https://tech.meituan.com/spark_tuning_pro.html
http://dblab.xmu.edu.cn/blog/spark-quick-start-guide/#more-577
spark.sql数据类型:http://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.Row