spark-sql基础

一:Spark SQL的特色
一、支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。
二、多种性能优化技术:in-memory columnar storage、byte-code generation、cost model动态评估等。
三、组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户均可以本身从新开发,而且动态扩展。html

 

二:Spark SQL的性能优化技术简介
一、内存列存储(in-memory columnar storage)
  内存列存储意味着,Spark SQL的数据,不是使用Java对象的方式来进行存储,而是使用面向列的内存存储的方式来进行存储。也就是说,每一列,做为一个数据存储的单位。从而大大优化了内存使用的效率。采用了内存列存储以后,减小了对内存的消耗,也就避免了gc大量数据的性能开销。
二、字节码生成技术(byte-code generation)
  Spark SQL在其catalyst模块的expressions中增长了codegen模块,对于SQL语句中的计算表达式,好比select num + num from t这种的sql,就可使用动态字节码生成技术来优化其性能。
三、Scala代码编写的优化
  对于Scala代码编写中,可能会形成较大性能开销的地方,本身重写,使用更加复杂的方式,来获取更好的性能。好比Option样例类、for循环、map/filter/foreach等高阶函数,以及不可变对象,都改为了用null、while循环等来实现,而且重用可变的对象。java

 

三:spark Sql的通常用法mysql

可参考官方文档:https://spark.apache.org/docs/latest/sql-programming-guide.htmlgit

1- 建立DataFramegithub

JavaSparkContext sc = ...; 
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");
df.show();

 2- DataFrame经常使用用法redis

load:主要用于加载数据,建立出DataFrame;                       DataFrame df = sqlContext.read().load("users.parquet");
save: 主要用于将DataFrame中的数据保存到文件中。
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
SaveMode:ErrorIfExists--抛异常,Append--追加,Overwrite--重写,Ignore--忽略,不作操做
format:手动指定数据类型:parquet也是一种数据类型
DataFrame df = sqlContext.read().format("json").load("people.json");
df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
show:  显示数据
collect: 获取全部数据到数组
collectAsList:获取全部数据到List
describe(cols: String*):获取指定字段的统计信息,好比count, mean, stddev, min, max等
first, head, take, takeAsList:获取若干行记录
where筛选条件
filter:根据字段进行筛选
select:获取指定字段值
selectExpr:能够对指定字段进行特殊处理
col/apply:获取指定字段
drop:去除指定字段,保留其余字段
limit:限制行数
orderBy和sort:排序
group by数据分组
distinct数据去重
dropDuplicates:根据指定字段去重
agg方法实现聚合操做
withColumn添加新的一列
join连接
union
intersect方法能够计算出两个DataFrame中相同的记录,
except获取一个DataFrame中有另外一个DataFrame中没有的记录
withColumnRenamed:重命名DataFrame中的指定字段名
explode根据某个字段内容进行分割,而后生成多行,这时可使用explode方法

 示例:sql

DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");
df.show();   //打印全部数据
df.printSchema();    //打印元数据
df.select("name").show();  //查询某列的数据
df.select(df.col("name"), df.col("age").plus(1)).show();  //查询某列的数据,并对其进行计算
df.filter(df.col("age").gt(21)).show();    //对某列的值进行过滤
df.groupBy("age").count().show();       //排序

3- spark Sql全部的内置函数express

种类 函数
聚合函数 approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
集合函数 array_contains, explode, size, sort_array
日期/时间函数

日期时间转换
unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
从日期时间中提取字段
year, month, dayofmonth, hour, minute, secondapache

日期/时间计算
datediff, date_add, date_sub, add_months, last_day, next_day, months_between
获取当前时间等
current_date, current_timestamp, trunc, date_format编程

数学函数 abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex
混合函数 array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when
字符串函数 ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper
窗口函数 cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber

4- Spark SQL支持两种方式来将RDD转换为DataFrame。
第一种方式,是使用反射来推断包含了特定数据类型的RDD的元数据。这种基于反射的方式,代码比较简洁,当你已经知道你的RDD的元数据时,是一种很是不错的方式。

JavaRDD<String> lines = sc.textFile("C:\\Users\\zhang\\Desktop\\students.txt");
​JavaRDD<Student> students = lines.map(new Function<String, Student>() {
  ​​private static final long serialVersionUID = 1L;
  ​​@Override
  public Student call(String line) throws Exception {
    ​​​String[] lineSplited = line.split(",");  
​​​    Student stu = new Student();
    ​​​stu.setId(Integer.valueOf(lineSplited[0].trim()));  
    ​​​stu.setName(lineSplited[1]);  
    ​​​stu.setAge(Integer.valueOf(lineSplited[2].trim()));
​​​    return stu;
​​  }
​});
DataFrame studentDF = sqlContext.createDataFrame(students, Student.class); //Stundent类必须实现接口 Serializable

第二种方式,是经过编程接口来建立DataFrame,你能够在程序运行时动态构建一份元数据,而后将其应用到已经存在的RDD上。这种方式的代码比较冗长,可是若是在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能经过这种动态构建元数据的方式。

​​import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;


JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
  ​​​private static final long serialVersionUID = 1L;
   @Override
​​​  public Row call(String line) throws Exception {
​​​​    String[] lineSplited = line.split(",");
    return RowFactory.create(
      ​​​​​​Integer.valueOf(lineSplited[0]),
      ​​​​​​lineSplited[1],
      ​​​​​​Integer.valueOf(lineSplited[2]));      
   ​​​}
​​});

List<StructField> structFields = new ArrayList<StructField>(); ​​structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType, true)); ​​structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); ​​structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); ​​StructType structType = DataTypes.createStructType(structFields); ​​// 第三步,使用动态构造的元数据,将RDD转换为DataFrame ​​DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);// 第一步,建立一个普通的RDDJavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//students.txt");​​// 第二步,动态构造元数据

 可参考:https://www.jianshu.com/p/4df4aa54ad15

 

四:数据源Parquet

1--列式存储和行式存储相比有哪些优点呢?

能够跳过不符合条件的数据,只读取须要的数据,下降IO数据量。

压缩编码能够下降磁盘存储空间。因为同一列的数据类型是同样的,可使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。

只读取须要的列,支持向量运算,可以获取更好的扫描性能

 

2--为何要使用parque:相对于txt文件而言,parquet查询性能的提高在某些状况下可能达到 30 倍或更高,存储的节省可高达 75%。

用法:

        // 读取Parquet文件中的数据,建立一个DataFrame
        DataFrame userDf = sqlContext.read().parquet("D:\\文档\\users.parquet");

 

3--自动分区推断:Spark SQL就会自动根据目录结构,推断出分区信息。此外,分区列的数据类型,也是自动被推断出来的。目前,Spark SQL仅支持自动推断出数字类型和字符串类型。有时,用户也许不但愿Spark SQL自动推断分区列的数据类型。此时只要设置一个配置便可, spark.sql.sources.partitionColumnTypeInference.enabled,默认为true,即自动推断分区列的类型,设置为false,即不会自动推断类型。禁止自动推断分区列的类型时,全部分区列的类型,就统一默认都是String。

可参考:spark从入门到精通--中华石杉,第78讲

 

4-- 合并元数据:用户能够在一开始就定义一个简单的元数据,而后随着业务须要,逐渐往元数据中添加更多的列。在这种状况下,用户可能会建立多个Parquet文件,有着多个不一样的可是却互相兼容的元数据。Parquet数据源支持自动推断出这种状况,而且进行多个Parquet文件的元数据的合并。

默认状况下是不进行数据元数据的合并:

一、读取Parquet文件时,将数据源的选项,mergeSchema,设置为true
二、使用SQLContext.setConf()方法,将spark.sql.parquet.mergeSchema参数设置为true

 

五:数据源

1- json:注意的是,这里使用的JSON文件与传统意义上的JSON文件是不同的。每行都必须,也只能包含一个,单独的,自包含的,有效的JSON对象。不能让一个JSON对象分散在多行。不然会报错。

2- hive

使用hive中数据:
JavaSparkContext sparkContext = new JavaSparkContext(conf); // 建立HiveContext,注意,这里,它接收的是SparkContext做为参数,不是JavaSparkContext HiveContext hiveContext = new HiveContext(sparkContext.sc()); // 判断是否存在student_infos表,若是存在则删除 hiveContext.sql("drop table if exists student_infos");
将DataFrame数据保存到hive表中:
goodStudentInfoDF.saveAsTable("good_student_info");

  2- jdbc

public class JDBCDataSource {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JDBCDataSourceJava").setMaster("local");
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sparkContext);

        // 分别将mysql中两张表的数据加载为DataFrame
        Map<String, String> options = new HashMap<String, String>();
        options.put("url", "jdbc:mysql://hadoop-100:3306/mytest");
        options.put("dbtable", "student_infos");
        options.put("user", "root");
        options.put("password", "zhaojun2436");

        DataFrame infoDF = sqlContext.read().options(options).format("jdbc").load();

        options.put("dbtable", "student_scores");
        DataFrame scoreDF = sqlContext.read().options(options).format("jdbc").load();

        // 将两个DataFrame转换为JavaPairRDD,执行join操做
        JavaPairRDD<String, Integer> infoRDD = infoDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<>(row.getString(0), row.getInt(1));
            }
        });

        JavaPairRDD<String, Integer> scoreRDD = scoreDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<>(row.getString(0), row.getInt(1));
            }
        });

        JavaPairRDD<String, Tuple2<Integer, Integer>> infoJoinScore = infoRDD.join(scoreRDD);

        // 将JavaPairRDD转换为JavaRDD<Row>
        JavaRDD<Row> infoJoinScoreRDD = infoJoinScore.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception {
                return RowFactory.create(v1._1, v1._2._1, v1._2._2);
            }
        });

        // 过滤出分数大于80分的数据
        JavaRDD<Row> goodStudent = infoJoinScoreRDD.filter(new Function<Row, Boolean>() {
            @Override
            public Boolean call(Row v1) throws Exception {
                if (v1.getInt(2) > 80) {
                    return true;
                }
                return false;
            }
        });

        // 转换为DataFrame
        List<StructField> fieldList = new ArrayList<>();
        fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        fieldList.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));

        StructType structType = DataTypes.createStructType(fieldList);

        DataFrame df = sqlContext.createDataFrame(goodStudent, structType);

        Row[] collect = df.collect();
        for(Row row : collect) {
            System.out.println(row);
        }

        // 将DataFrame中的数据保存到mysql表中
        // 这种方式是在企业里很经常使用的,有多是插入mysql、有多是插入hbase,还有多是插入redis缓存
        goodStudent.foreach(new VoidFunction<Row>() {
            @Override
            public void call(Row row) throws Exception {
                String sql = "insert into good_student_infos values("
                        + "'" + String.valueOf(row.getString(0)) + "',"
                        + Integer.valueOf(String.valueOf(row.get(1))) + ","
                        + Integer.valueOf(String.valueOf(row.get(2))) + ")";

                Class.forName("com.mysql.jdbc.Driver");

                Connection conn = null;
                Statement stmt = null;
                try {
                    conn = DriverManager.getConnection(
                            "jdbc:mysql://hadoop-100:3306/mytest", "root", "zhaojun2436");
                    stmt = conn.createStatement();
                    stmt.executeUpdate(sql);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if(stmt != null) {
                        stmt.close();
                    }
                    if(conn != null) {
                        conn.close();
                    }
                }
            }
        });
    }
}

 

 扩展阅读:udf自定义函数,https://zhangslob.github.io/2018/10/29/Spark%E5%AE%9E%E6%88%98%EF%BC%88%E4%BA%8C%EF%BC%89%E5%AD%A6%E4%B9%A0UDF/

相关文章
相关标签/搜索