一:Spark SQL的特色
一、支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。
二、多种性能优化技术:in-memory columnar storage、byte-code generation、cost model动态评估等。
三、组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户均可以本身从新开发,而且动态扩展。html
二:Spark SQL的性能优化技术简介
一、内存列存储(in-memory columnar storage)
内存列存储意味着,Spark SQL的数据,不是使用Java对象的方式来进行存储,而是使用面向列的内存存储的方式来进行存储。也就是说,每一列,做为一个数据存储的单位。从而大大优化了内存使用的效率。采用了内存列存储以后,减小了对内存的消耗,也就避免了gc大量数据的性能开销。
二、字节码生成技术(byte-code generation)
Spark SQL在其catalyst模块的expressions中增长了codegen模块,对于SQL语句中的计算表达式,好比select num + num from t这种的sql,就可使用动态字节码生成技术来优化其性能。
三、Scala代码编写的优化
对于Scala代码编写中,可能会形成较大性能开销的地方,本身重写,使用更加复杂的方式,来获取更好的性能。好比Option样例类、for循环、map/filter/foreach等高阶函数,以及不可变对象,都改为了用null、while循环等来实现,而且重用可变的对象。java
三:spark Sql的通常用法mysql
可参考官方文档:https://spark.apache.org/docs/latest/sql-programming-guide.htmlgit
1- 建立DataFramegithub
JavaSparkContext sc = ...; SQLContext sqlContext = new SQLContext(sc); DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json"); df.show();
2- DataFrame经常使用用法redis
load:主要用于加载数据,建立出DataFrame; DataFrame df = sqlContext.read().load("users.parquet"); save: 主要用于将DataFrame中的数据保存到文件中。 df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); SaveMode:ErrorIfExists--抛异常,Append--追加,Overwrite--重写,Ignore--忽略,不作操做 format:手动指定数据类型:parquet也是一种数据类型 DataFrame df = sqlContext.read().format("json").load("people.json"); df.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); show: 显示数据 collect: 获取全部数据到数组 collectAsList:获取全部数据到List describe(cols: String*):获取指定字段的统计信息,好比count, mean, stddev, min, max等 first, head, take, takeAsList:获取若干行记录 where筛选条件 filter:根据字段进行筛选 select:获取指定字段值 selectExpr:能够对指定字段进行特殊处理 col/apply:获取指定字段 drop:去除指定字段,保留其余字段 limit:限制行数 orderBy和sort:排序 group by数据分组 distinct数据去重 dropDuplicates:根据指定字段去重 agg方法实现聚合操做 withColumn添加新的一列 join连接 union intersect方法能够计算出两个DataFrame中相同的记录, except获取一个DataFrame中有另外一个DataFrame中没有的记录 withColumnRenamed:重命名DataFrame中的指定字段名 explode根据某个字段内容进行分割,而后生成多行,这时可使用explode方法
示例:sql
DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json"); df.show(); //打印全部数据 df.printSchema(); //打印元数据 df.select("name").show(); //查询某列的数据 df.select(df.col("name"), df.col("age").plus(1)).show(); //查询某列的数据,并对其进行计算 df.filter(df.col("age").gt(21)).show(); //对某列的值进行过滤 df.groupBy("age").count().show(); //排序
3- spark Sql全部的内置函数express
种类 | 函数 |
聚合函数 | approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct |
集合函数 | array_contains, explode, size, sort_array |
日期/时间函数 | 日期时间转换 日期/时间计算 |
数学函数 | abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex |
混合函数 | array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when |
字符串函数 | ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper |
窗口函数 | cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber |
4- Spark SQL支持两种方式来将RDD转换为DataFrame。
第一种方式,是使用反射来推断包含了特定数据类型的RDD的元数据。这种基于反射的方式,代码比较简洁,当你已经知道你的RDD的元数据时,是一种很是不错的方式。
JavaRDD<String> lines = sc.textFile("C:\\Users\\zhang\\Desktop\\students.txt");
JavaRDD<Student> students = lines.map(new Function<String, Student>() {
private static final long serialVersionUID = 1L;
@Override
public Student call(String line) throws Exception {
String[] lineSplited = line.split(",");
Student stu = new Student();
stu.setId(Integer.valueOf(lineSplited[0].trim()));
stu.setName(lineSplited[1]);
stu.setAge(Integer.valueOf(lineSplited[2].trim()));
return stu;
}
});
DataFrame studentDF = sqlContext.createDataFrame(students, Student.class); //Stundent类必须实现接口 Serializable
第二种方式,是经过编程接口来建立DataFrame,你能够在程序运行时动态构建一份元数据,而后将其应用到已经存在的RDD上。这种方式的代码比较冗长,可是若是在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能经过这种动态构建元数据的方式。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String line) throws Exception {
String[] lineSplited = line.split(",");
return RowFactory.create(
Integer.valueOf(lineSplited[0]),
lineSplited[1],
Integer.valueOf(lineSplited[2]));
}
});
List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType, true)); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields); // 第三步,使用动态构造的元数据,将RDD转换为DataFrame DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);// 第一步,建立一个普通的RDDJavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//students.txt");// 第二步,动态构造元数据
可参考:https://www.jianshu.com/p/4df4aa54ad15
四:数据源Parquet
1--列式存储和行式存储相比有哪些优点呢?
能够跳过不符合条件的数据,只读取须要的数据,下降IO数据量。
压缩编码能够下降磁盘存储空间。因为同一列的数据类型是同样的,可使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
只读取须要的列,支持向量运算,可以获取更好的扫描性能
2--为何要使用parque:相对于txt文件而言,parquet查询性能的提高在某些状况下可能达到 30 倍或更高,存储的节省可高达 75%。
用法:
// 读取Parquet文件中的数据,建立一个DataFrame DataFrame userDf = sqlContext.read().parquet("D:\\文档\\users.parquet");
3--自动分区推断:Spark SQL就会自动根据目录结构,推断出分区信息。此外,分区列的数据类型,也是自动被推断出来的。目前,Spark SQL仅支持自动推断出数字类型和字符串类型。有时,用户也许不但愿Spark SQL自动推断分区列的数据类型。此时只要设置一个配置便可, spark.sql.sources.partitionColumnTypeInference.enabled,默认为true,即自动推断分区列的类型,设置为false,即不会自动推断类型。禁止自动推断分区列的类型时,全部分区列的类型,就统一默认都是String。
可参考:spark从入门到精通--中华石杉,第78讲
4-- 合并元数据:用户能够在一开始就定义一个简单的元数据,而后随着业务须要,逐渐往元数据中添加更多的列。在这种状况下,用户可能会建立多个Parquet文件,有着多个不一样的可是却互相兼容的元数据。Parquet数据源支持自动推断出这种状况,而且进行多个Parquet文件的元数据的合并。
默认状况下是不进行数据元数据的合并:
一、读取Parquet文件时,将数据源的选项,mergeSchema,设置为true 二、使用SQLContext.setConf()方法,将spark.sql.parquet.mergeSchema参数设置为true
五:数据源
1- json:注意的是,这里使用的JSON文件与传统意义上的JSON文件是不同的。每行都必须,也只能包含一个,单独的,自包含的,有效的JSON对象。不能让一个JSON对象分散在多行。不然会报错。
2- hive
使用hive中数据:
JavaSparkContext sparkContext = new JavaSparkContext(conf);
// 建立HiveContext,注意,这里,它接收的是SparkContext做为参数,不是JavaSparkContext
HiveContext hiveContext = new HiveContext(sparkContext.sc());
// 判断是否存在student_infos表,若是存在则删除
hiveContext.sql("drop table if exists student_infos");
将DataFrame数据保存到hive表中:
goodStudentInfoDF.saveAsTable("good_student_info");
2- jdbc
public class JDBCDataSource { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JDBCDataSourceJava").setMaster("local"); JavaSparkContext sparkContext = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sparkContext); // 分别将mysql中两张表的数据加载为DataFrame Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:mysql://hadoop-100:3306/mytest"); options.put("dbtable", "student_infos"); options.put("user", "root"); options.put("password", "zhaojun2436"); DataFrame infoDF = sqlContext.read().options(options).format("jdbc").load(); options.put("dbtable", "student_scores"); DataFrame scoreDF = sqlContext.read().options(options).format("jdbc").load(); // 将两个DataFrame转换为JavaPairRDD,执行join操做 JavaPairRDD<String, Integer> infoRDD = infoDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<>(row.getString(0), row.getInt(1)); } }); JavaPairRDD<String, Integer> scoreRDD = scoreDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<>(row.getString(0), row.getInt(1)); } }); JavaPairRDD<String, Tuple2<Integer, Integer>> infoJoinScore = infoRDD.join(scoreRDD); // 将JavaPairRDD转换为JavaRDD<Row> JavaRDD<Row> infoJoinScoreRDD = infoJoinScore.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() { @Override public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception { return RowFactory.create(v1._1, v1._2._1, v1._2._2); } }); // 过滤出分数大于80分的数据 JavaRDD<Row> goodStudent = infoJoinScoreRDD.filter(new Function<Row, Boolean>() { @Override public Boolean call(Row v1) throws Exception { if (v1.getInt(2) > 80) { return true; } return false; } }); // 转换为DataFrame List<StructField> fieldList = new ArrayList<>(); fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); fieldList.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(fieldList); DataFrame df = sqlContext.createDataFrame(goodStudent, structType); Row[] collect = df.collect(); for(Row row : collect) { System.out.println(row); } // 将DataFrame中的数据保存到mysql表中 // 这种方式是在企业里很经常使用的,有多是插入mysql、有多是插入hbase,还有多是插入redis缓存 goodStudent.foreach(new VoidFunction<Row>() { @Override public void call(Row row) throws Exception { String sql = "insert into good_student_infos values(" + "'" + String.valueOf(row.getString(0)) + "'," + Integer.valueOf(String.valueOf(row.get(1))) + "," + Integer.valueOf(String.valueOf(row.get(2))) + ")"; Class.forName("com.mysql.jdbc.Driver"); Connection conn = null; Statement stmt = null; try { conn = DriverManager.getConnection( "jdbc:mysql://hadoop-100:3306/mytest", "root", "zhaojun2436"); stmt = conn.createStatement(); stmt.executeUpdate(sql); } catch (Exception e) { e.printStackTrace(); } finally { if(stmt != null) { stmt.close(); } if(conn != null) { conn.close(); } } } }); } }
扩展阅读:udf自定义函数,https://zhangslob.github.io/2018/10/29/Spark%E5%AE%9E%E6%88%98%EF%BC%88%E4%BA%8C%EF%BC%89%E5%AD%A6%E4%B9%A0UDF/