初识Spark2.0之Spark SQL

内存计算平台Spark在今年6月份的时候正式发布了spark2.0,相比上一版本的spark1.6版本,在内存优化,数据组织,流计算等方面都作出了较大的改变,同时更加注重基于DataFrame数据组织的MLlib,更加注重机器学习整个过程的管道化。java

固然,做为使用者,特别是须要运用到线上的系统,大部分厂家仍是会继续选择已经稳定的spark1.6版本,而且在spark2.0逐渐成熟以后才会开始考虑系统组件的升级。做为开发者,仍是有必要先行一步,去了解spark2.0的一些特性和使用,及思考/借鉴一些spark2.0作出某些改进的思路。mysql

首先,为了调用spark API 来完成咱们的计算,须要先建立一个sparkContext:sql

 String warehouseLocation = System.getProperty("user.dir") + "spark-warehouse";//用户的当前工做目录
SparkConf conf = new SparkConf().setAppName("spark sql test")  
                .set("spark.sql.warehouse.dir", warehouseLocation)  
                .setMaster("local[3]");
  SparkSession spark = SparkSession  
                .builder()  
                .config(conf)  
                .getOrCreate();

上述代码主要有三点:数据库

    • 使用spark sql时须要指定数据库的文件地址,这里使用了一个本地的目录
    • spark配置,指定spark app的名称和数据库地址,master url为local 3核
    • 使用SparkSession,取代了本来的SQLContext与HiveContext。对于DataFrame API的用户来讲,Spark常见的混乱源头来自于使用哪一个“context”。如今你可使用SparkSession了,它做为单个入口能够兼容二者。注意本来的SQLContext与HiveContext仍然保留,以支持向下兼容。这是spark2.0的一个较大的改变,对用户更加友好。

下面开始体验spark sql:json

 //===========================================1 spark SQL===================  
        //数据导入方式  
        Dataset<Row> df = spark.read().json("..\\sparkTestData\\people.json");  
        //查看表  
        df.show();  
        //查看表结构  
        df.printSchema();  
        //查看某一列 相似于MySQL: select name from people  
        df.select("name").show();  
        //查看多列并做计算 相似于MySQL: select name ,age+1 from people  
        df.select(col("name"), col("age").plus(1)).show();  
        //设置过滤条件 相似于MySQL:select * from people where age>21  
        df.filter(col("age").gt(21)).show();  
        //作聚合操做 相似于MySQL:select age,count(*) from people group by age  
        df.groupBy("age").count().show();  
        //上述多个条件进行组合 select ta.age,count(*) from (select name,age+1 as "age" from people) as ta where ta.age>21 group by ta.age  
        df.select(col("name"), col("age").plus(1).alias("age")).filter(col("age").gt(21)).groupBy("age").count().show();  
  
        //直接使用spark SQL进行查询  
        //先注册为临时表  
        df.createOrReplaceTempView("people");  
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");  
        sqlDF.show();

主要关注如下几点:windows

  • 数据来源:spark能够直接导入json格式的文件数据,people.json是我从spark安装包下拷贝的测试数据。
  • spark sql:sparkSql语法和用法和mysql有必定的类似性,能够查看表、表结构、查询、聚合等操做。用户可使用sparkSql的API接口作聚合查询等操做或者用类SQL语句实现(可是必须将DataSet注册为临时表)
  • DataSet:DataSet是spark2.0i引入的一个新的特性(在spark1.6中属于alpha版本)。DataSet结合了RDD和DataFrame的优势, 并带来的一个新的概念Encoder当序列化数据时,,Encoder产生字节码与off-heap进行交互,,可以达到按需访问数据的效果,而不用反序列化整个对象。
咱们能够为自定义的对象建立DataSet,首先建立一个JavaBeans:
/** 
     * 一个描述人属性的JavaBeans 
     * A JavaBean is a Java object that satisfies certain programming conventions: 
 
        The JavaBean class must implement either Serializable or Externalizable 
        The JavaBean class must have a no-arg constructor 
        All JavaBean properties must have public setter and getter methods 
        All JavaBean instance variables should be private 
     */  
    public static class Person implements Serializable {  
        private String name;  
        private int age;  
  
        public String getName() {  
            return name;  
        }  
  
        public void setName(String name) {  
            this.name = name;  
        }  
  
        public int getAge() {  
            return age;  
        }  
  
        public void setAge(int age) {  
            this.age = age;  
        }  
    }

接下来,就能够为该类的对象建立DataSet了,并像操做表同样操做自定义对象的DataSet了:数据结构

   //为自定义的对象建立Dataset  
        List<Person> personpList = new ArrayList<Person>();  
        Person person1 = new Person();  
        person1.setName("Andy");  
        person1.setAge(32);  
        Person person2 = new Person();  
        person2.setName("Justin");  
        person2.setAge(19);  
        personpList.add(person1);  
        personpList.add(person2);  
        Encoder<Person> personEncoder = Encoders.bean(Person.class);  
        Dataset<Person> javaBeanDS = spark.createDataset(  
                personpList,  
                personEncoder  
        );  
        javaBeanDS.show();

同时,能够利用Java反射的特性,来从其余数据集中建立DataSet对象:app

 //spark支持使用java 反射机制推断表结构  
        //1 首先建立一个存储person对象的RDD  
        JavaRDD<Person> peopleRDD = spark.read()  
                .textFile("..\\sparkTestData\\people.txt")  
                .javaRDD()  
                .map(new Function<String, Person>() {  
                    public Person call(String line) throws Exception {  
                        String[] parts = line.split(",");  
                        Person person = new Person();  
                        person.setName(parts[0]);  
                        person.setAge(Integer.parseInt(parts[1].trim()));  
                        return person;  
                    }  
                });  
        //2 表结构推断  
        Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);  
        peopleDF.createOrReplaceTempView("people");  
  
        //3 定义map 这里对每一个元素作序列化操做  
        Encoder<String> stringEncoder = Encoders.STRING();  
        Dataset<String> peopleSerDF = peopleDF.map(new MapFunction<Row, String>() {  
            public String call(Row row) throws Exception {  
                return "Name: " + row.getString(1) + " and age is " + String.valueOf(row.getInt(0));  
            }  
        }, stringEncoder);  
        peopleSerDF.show();  
        //==============================================3 从RDD建立Dataset StructType对象的使用  
        JavaRDD<String> peopleRDD2 = spark.sparkContext()  
                .textFile("..\\sparkTestData\\people.txt", 1)  
                .toJavaRDD();  
  
        // 建立一个描述表结构的schema  
        String schemaString = "name age";  
        List<StructField> fields = new ArrayList<StructField>();  
        for (String fieldName : schemaString.split(" ")) {  
            StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);  
            fields.add(field);  
        }  
        StructType schema = DataTypes.createStructType(fields);  
  
        // Convert records of the RDD (people) to Rows  
        JavaRDD<Row> rowRDD = peopleRDD2.map(new Function<String, Row>() {  
            //@Override  
            public Row call(String record) throws Exception {  
                String[] attributes = record.split(",");  
                return RowFactory.create(attributes[0], attributes[1].trim());  
            }  
        });  
  
        // Apply the schema to the RDD  
        Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);  
  
        // Creates a temporary view using the DataFrame  
        peopleDataFrame.createOrReplaceTempView("people");  
        peopleDataFrame.show();

主要关注如下几点:机器学习

  • RDD:从普通文本文件中解析数据,并建立结构化数据结构的RDD。
  • 表结构推断的方式建立DataSet:利用Java类反射特性将RDD转换为DataSet。
  • 指定表结构的方式建立DataSet:咱们可使用StructType来明肯定义咱们的表结构,完成DataSet的建立
如何将本身的数据/文本导入spark并建立spark的数据对象,对新手来讲显得尤其关键,对本身的数据表达好了以后,才有机会去尝试spark的其余API ,完成咱们的目标。通常数据源在通过咱们其余程序的前处理以后,存储成行形式的文本/json格式或者自己存储的hive/mysql数据库中,spark对这些数据源的调用都是比较方便的。
 
介绍完了spark-sql的数据导入及数据表达后,咱们来完成一个比较简单的数据统计任务。通常在工做生活中对某些数据按必定的周期进行统计分析是一个比较常见的任务了。下面,咱们就以股票统计的例子为例。咱们使用spark的窗口统计功能,来对某一公司的股票在2016年6月份的各个星期的均值作统计。
 //在Spark 2.0中,window API内置也支持time windows!Spark SQL中的time windows和Spark Streaming中的time windows很是相似。  
        Dataset<Row> stocksDF = spark.read().option("header","true").  
                option("inferSchema","true").  
                csv("..\\sparkTestData\\stocks.csv");  
  
        //stocksDF.show();  
  
        Dataset<Row> stocks201606 = stocksDF.filter("year(Date)==2016").  
                filter("month(Date)==6");  
        stocks201606.show(100,false);

首先读入了csv格式的数据文件,同时将2016年6月份的数据过滤出来,并以不截断的方式输出前面100条记录,运行的结果为:ide

调用window接口作窗口统计:

  //window通常在group by语句中使用。window方法的第一个参数指定了时间所在的列;  
    //第二个参数指定了窗口的持续时间(duration),它的单位能够是seconds、minutes、hours、days或者weeks。  
        Dataset<Row> tumblingWindowDS = stocks201606.groupBy(window(stocks201606.col("Date"),"1 week")).  
                agg(avg("Close").as("weekly_average"));  
        tumblingWindowDS.show(100,false);  
        tumblingWindowDS.sort("window.start").  
                select("window.start","window.end","weekly_average").  
                show(false);

其运行结果为:

因为没有指定窗口的开始时间,所以统计的开始时间为2016-05-26,而且不是从0点开始的。一般状况下,这样统计就显得有点不对了,所以咱们须要指定其开始的日期和时间,可是遗憾的是spark并无接口/参数让咱们明确的指定统计窗口的开始时间。好在提供了另一种方式,指定偏移时间,上述时间(2016-05-26 08:00:00)作一个时间偏移,也能够获得咱们想要的开始时间(2016-06-01 00:00:00)。

 //在前面的示例中,咱们使用的是tumbling window。为了可以指定开始时间,咱们须要使用sliding window(滑动窗口)。  
    //到目前为止,没有相关API来建立带有开始时间的tumbling window,可是咱们能够经过将窗口时间(window duration)  
    //和滑动时间(slide duration)设置成同样来建立带有开始时间的tumbling window。代码以下:  
        Dataset<Row>  windowWithStartTime = stocks201606.  
                groupBy(window(stocks201606.col("Date"),"1 week","1 week", "136 hour")).  
                agg(avg("Close").as("weekly_average"));  
        //6 days参数就是开始时间的偏移量;前两个参数分别表明窗口时间和滑动时间,咱们打印出这个窗口的内容:  
        windowWithStartTime.sort("window.start").  
                select("window.start","window.end","weekly_average").  
                show(false);

运行结果为:

这就获得了咱们须要的统计结果了。

关于spark2.0的sparkSql部分,基本就介绍这么多了。

相关文章
相关标签/搜索