Spark入门实战系列--6.SparkSQL(上)--SparkSQL简介

【注】该系列文章以及使用到安装包/测试数据 能够在《倾情大奉送--Spark入门实战系列》获取

1SparkSQL的发展历程

1.1 Hive and Shark

SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,它是当时惟一运行在Hadoop上的SQL-on-Hadoop工具。可是MapReduce计算过程当中大量的中间磁盘落地过程消耗了大量的I/O,下降的运行效率,为了提升SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:html

l MapRDrilljava

l ClouderaImpalaweb

l Sharksql

其中Shark是伯克利实验室Spark生态环境的组件之一,它修改了下图所示的右下角的内存管理、物理计划、执行三个模块,并使之能运行在Spark引擎上,从而使得SQL查询的速度获得10-100倍的提高。数据库

clip_image002

1.2 SharkSparkSQL 

可是,随着Spark的发展,对于野心勃勃的Spark团队来讲,Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了SparkOne Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,因此提出了SparkSQL项目。SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优势,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,从新开发了SparkSQL代码;因为摆脱了对Hive的依赖性,SparkSQL不管在数据兼容、性能优化、组件扩展方面都获得了极大的方便,真可谓“退一步,海阔天空”。express

l数据兼容方面  不但兼容Hive,还能够从RDDparquet文件、JSON文件中获取数据,将来版本甚至支持获取RDBMS数据以及cassandraNOSQL数据;apache

l性能优化方面  除了采起In-Memory Columnar Storagebyte-code generation等优化技术外、将会引进Cost Model对查询进行动态评估、获取最佳物理计划等等;编程

l组件扩展方面  不管是SQL的语法解析器、分析器仍是优化器均可以从新定义,进行扩展。json

201461Shark项目和SparkSQL项目的主持人Reynold Xin宣布:中止对Shark的开发,团队将全部资源放SparkSQL项目上,至此,Shark的发展画上了句话,但也所以发展出两个直线:SparkSQLHive on Spark数组

clip_image004

其中SparkSQL做为Spark生态的一员继续发展,而再也不受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark做为Hive的底层引擎之一,也就是说,Hive将再也不受限于一个引擎,能够采用Map-ReduceTezSpark等引擎。

1.3 SparkSQL的性能

Shark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提升:

clip_image006

那么,摆脱了Hive的限制,SparkSQL的性能又有怎么样的表现呢?虽然没有Shark相对于Hive那样瞩目地性能提高,但也表现得很是优异:

clip_image008

为何SparkSQL的性能会获得怎么大的提高呢?主要SparkSQL在下面几点作了优化:

A:内存列存储(In-Memory Columnar Storage

SparkSQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储,以下图所示。

clip_image010

该存储方式不管在空间占用量和读取吞吐率上都占有很大优点。

对于原生态的JVM对象存储方式,每一个对象一般要增长12-16字节的额外开销,对于一个270MBTPC-H lineitem table数据,使用这种方式读入内存,要使用970MB左右的内存空间(一般是25倍于原生数据空间);另外,使用这种方式,每一个数据记录产生一个JVM对象,若是是大小为200B的数据记录,32G的堆栈将产生1.6亿个对象,这么多的对象,对于GC来讲,可能要消耗几分钟的时间来处理(JVM的垃圾收集时间与堆栈中的对象数量呈线性相关)。显然这种内存存储方式对于基于内存计算的Spark来讲,很昂贵也负担不起。

对于内存列存储来讲,将全部原生数据类型的列采用原生数组来存储,将Hive支持的复杂数据类型(如arraymap等)先序化后并接成一个字节数组来存储。这样,每一个列建立一个JVM对象,从而致使能够快速的GC和紧凑的数据存储;额外的,还可使用低廉CPU开销的高效压缩方法(如字典编码、行长度编码等压缩方法)下降内存开销;更有趣的是,对于分析查询中频繁使用的聚合特定列,性能会获得很大的提升,缘由就是这些列的数据放在一块儿,更容易读入内存进行计算。

B:字节码生成技术(bytecode generation,即CG

在数据库查询中有一个昂贵的操做是查询语句中的表达式,主要是因为JVM的内存模型引发的。好比以下一个查询:

SELECT a + b FROM table

在这个查询里,若是采用通用的SQL语法途径去处理,会先生成一个表达式树(有两个节点的Add树,参考后面章节),在物理处理这个表达式树的时候,将会如图所示的7个步骤:

1.  调用虚函数Add.eval(),须要确认Add两边的数据类型

2.  调用虚函数a.eval(),须要确认a的数据类型

3.  肯定a的数据类型是Int,装箱

4.  调用虚函数b.eval(),须要确认b的数据类型

5.  肯定b的数据类型是Int,装箱

6.  调用Int类型的Add

7.  返回装箱后的计算结果

其中屡次涉及到虚函数的调用,虚函数的调用会打断CPU的正常流水线处理,减缓执行。

Spark1.1.0catalyst模块的expressions增长了codegen模块,若是使用动态字节码生成技术(配置spark.sql.codegen参数),SparkSQL在执行物理计划的时候,对匹配的表达式采用特定的代码,动态编译,而后运行。如上例子,匹配到Add方法:

clip_image012

而后,经过调用,最终调用:

clip_image014

最终实现效果相似以下伪代码:

val a: Int = inputRow.getInt(0)

val b: Int = inputRow.getInt(1)

val result: Int = a + b

resultRow.setInt(0, result)

对于Spark1.1.0,对SQL表达式都做了CG优化,具体能够参看codegen模块。CG优化的实现主要仍是依靠scala2.10的运行时放射机制(runtime reflection)。对于SQL查询的CG优化,能够简单地用下图来表示:

clip_image016

CScala代码优化

另外,SparkSQL在使用Scala编写代码的时候,尽可能避免低效的、容易GC的代码;尽管增长了编写代码的难度,但对于用户来讲,仍是使用统一的接口,没受到使用上的困难。下图是一个Scala代码优化的示意图:

clip_image018

2SparkSQL运行架构

相似于关系型数据库,SparkSQL也是语句也是由Projectiona1a2a3)、Data SourcetableA)、Filtercondition)组成,分别对应sql查询过程当中的ResultData SourceOperation,也就是说SQL语句按Result-->Data Source-->Operation的次序来描述的。

clip_image020

 

当执行SparkSQL语句的顺序为:

1.对读入的SQL语句进行解析(Parse),分辨出SQL语句中哪些词是关键词(如SELECTFROMWHERE),哪些是表达式、哪些是Projection、哪些是Data Source等,从而判断SQL语句是否规范;

2.SQL语句和数据库的数据字典(列、表、视图等等)进行绑定(Bind),若是相关的ProjectionData Source等都是存在的话,就表示这个SQL语句是能够执行的;

3.通常的数据库会提供几个执行计划,这些计划通常都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize);

4.计划执行(Execute),按Operation-->Data Source-->Result的次序来进行的,在执行过程有时候甚至不须要读取物理表就能够返回结果,好比从新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。

2.1 TreeRule

SparkSQLSQL语句的处理和关系型数据库对SQL语句的处理采用了相似的方法,首先会将SQL语句进行解析(Parse),而后造成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操做,而操做的方法是采用Rule,经过模式匹配,对不一样类型的节点采用不一样的操做。在整个sql语句的处理过程当中,TreeRule相互配合,完成了解析、绑定(在SparkSQL中称为Analysis)、优化、物理计划等过程,最终生成能够执行的物理计划。

2.1.1 Tree

l  Tree的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees

l  Logical PlansExpressionsPhysical Operators均可以使用Tree表示

l  Tree的具体操做是经过TreeNode来实现的

Ø  SparkSQL定义了catalyst.trees的日志,经过这个日志能够形象的表示出树的结构

Ø  TreeNode可使用scala的集合操做方法(如foreach, map, flatMap, collect等)进行操做

Ø  有了TreeNode,经过Tree中各个TreeNode之间的关系,能够对Tree进行遍历操做,如使用transformDowntransformUpRule应用到给定的树段,而后用结果替代旧的树段;也可使用transformChildrenDowntransformChildrenUp对一个给定的节点进行操做,经过迭代将Rule应用到该节点以及子节点。

l  TreeNode能够细分红三种类型的Node

Ø  UnaryNode 一元节点,即只有一个子节点。如LimitFilter操做

Ø  BinaryNode 二元节点,即有左右子节点的二叉节点。如JionUnion操做

Ø  LeafNode 叶子节点,没有子节点的节点。主要用户命令类操做,如SetCommand

 

2.1.2 Rule

l  Rule的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules

l  RuleSparkSQLAnalyzerOptimizerSparkPlan等各个组件中都有应用到

l  Rule是一个抽象类,具体的Rule实现是经过RuleExecutor完成

l  Rule经过定义batchbatchs,能够简便的、模块化地对Tree进行transform操做

l  Rule经过定义OnceFixedPoint,能够对Tree进行一次操做或屡次操做(如对某些Tree进行屡次迭代操做的时候,达到FixedPoint次数迭代或达到先后两次的树结构没变化才中止操做,具体参看RuleExecutor.apply

2.2 sqlContexthiveContext的运行过程

SparkSQL有两个分支,sqlContexthiveContextsqlContext如今只支持SQL语法解析器(SQL-92语法);hiveContext如今支持SQL语法解析器和hivesql语法解析器,默认为hiveSQL语法解析器,用户能够经过配置切换成SQL语法解析器,来运行hiveSQL不支持的语法,

2.2.1 sqlContext的运行过程

sqlContext总的一个过程以下图所示:

1.SQL语句通过SqlParse解析成UnresolvedLogicalPlan

2.使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan

3.使用optimizerresolvedLogicalPlan进行优化,生成optimizedLogicalPlan

4.使用SparkPlanLogicalPlan转换成PhysicalPlan

5.使用prepareForExecution()PhysicalPlan转换成可执行物理计划;

6.使用execute()执行可执行物理计划;

7.生成SchemaRDD

在整个运行过程当中涉及到多个SparkSQL的组件,如SqlParseanalyzeroptimizerSparkPlan等等

clip_image022

2.2.2hiveContext的运行过程

hiveContext总的一个过程以下图所示:

1.SQL语句通过HiveQl.parseSql解析成Unresolved LogicalPlan,在这个解析过程当中对hiveql语句使用getAst()获取AST树,而后再进行解析;

2.使用analyzer结合数据hive源数据Metastore(新的catalog)进行绑定,生成resolved LogicalPlan

3.使用optimizerresolved LogicalPlan进行优化,生成optimized LogicalPlan,优化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))进行预处理;

4.使用hivePlannerLogicalPlan转换成PhysicalPlan

5.使用prepareForExecution()PhysicalPlan转换成可执行物理计划;

6.使用execute()执行可执行物理计划;

7.执行后,使用map(_.copy)将结果导入SchemaRDD

clip_image024

2.3 catalyst优化器

SparkSQL1.1整体上由四个模块组成:corecatalysthivehive-Thriftserver

l  core处理数据的输入输出,从不一样的数据源获取数据(RDDParquetjson等),将查询结果输出成schemaRDD

l  catalyst处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;

l  hivehive数据的处理

l  hive-ThriftServer提供CLIJDBC/ODBC接口

在这四个模块中,catalyst处于最核心的部分,其性能优劣将影响总体的性能。因为发展时间尚短,还有不少不足的地方,但其插件式的设计,为将来的发展留下了很大的空间。下面是catalyst的一个设计图:

clip_image026

 

其中虚线部分是之后版本要实现的功能,实线部分是已经实现的功能。从上图看,catalyst主要的实现组件有:

lsqlParse,完成sql语句的语法解析功能,目前只提供了一个简单的sql解析器;

lAnalyzer,主要完成绑定工做,将不一样来源的Unresolved LogicalPlan和数据元数据(如hive metastoreSchema catalog)进行绑定,生成resolved LogicalPlan

loptimizerresolved LogicalPlan进行优化,生成optimized LogicalPlan

l PlannerLogicalPlan转换成PhysicalPlan

l CostModel,主要根据过去的性能统计数据,选择最佳的物理执行计划

这些组件的基本实现方法:

l 先将sql语句经过解析生成Tree,而后在不一样阶段使用不一样的Rule应用到Tree上,经过转换完成各个组件的功能。

l Analyzer使用Analysis Rules,配合数据元数据(如hive metastoreSchema catalog),完善Unresolved LogicalPlan的属性而转换成resolved LogicalPlan

l optimizer使用Optimization Rules,对resolved LogicalPlan进行合并、列裁剪、过滤器下推等优化做业而转换成optimized LogicalPlan

l Planner使用Planning Strategies,对optimized LogicalPlan

3SparkSQL CLI

CLICommand-Line Interface,命令行界面)是指可在用户提示符下键入可执行指令的界面,它一般不支持鼠标,用户经过键盘输入指令,计算机接收到指令后予以执行。Spark CLI指的是使用命令界面直接输入SQL命令,而后发送到Spark集群进行执行,在界面中显示运行过程和最终的结果。

Spark1.1相较于Spark1.0最大的差异就在于Spark1.1增长了Spark SQL CLIThriftServer,使得Hive用户还有用惯了命令行的RDBMS数据库管理员较容易地上手,真正意义上进入了SQL时代。

【注】Spark CLISpark Thrift Server实验环境为第二课《Spark编译与部署(下)--Spark编译安装》所搭建

3.1  运行环境说明

3.1.1 硬软件环境

l  主机操做系统:Windows 64位,双核4线程,主频2.2G10G内存

l  虚拟软件:VMware® Workstation 9.0.0 build-812388

l  虚拟机操做系统:CentOS 64位,单核

l  虚拟机运行环境:

Ø  JDK1.7.0_55 64

Ø  Hadoop2.2.0(须要编译为64位)

Ø  Scala2.11.4

Ø  Spark1.1.0(须要编译)

Ø  Hive0.13.1

3.1.2 机器网络环境

集群包含三个节点,节点之间能够免密码SSH访问,节点IP地址和主机名分布以下:

序号

IP地址

机器名

类型

核数/内存

用户名

目录

1

192.168.0.61

hadoop1

NN/DN/RM

Master/Worker

1/3G

hadoop

/app 程序所在路径

/app/scala-...

/app/hadoop

/app/complied

2

192.168.0.62

hadoop2

DN/NM/Worker

1/2G

hadoop

3

192.168.0.63

hadoop3

DN/NM/Worker

1/2G

hadoop

3.2 配置并启动

3.2.1 建立并配置hive-site.xml

在运行Spark SQL CLI中须要使用到Hive Metastore,故须要在Spark中添加其uris。具体方法是在SPARK_HOME/conf目录下建立hive-site.xml文件,而后在该配置文件中,添加hive.metastore.uris属性,具体以下:

<configuration> 

  <property>

    <name>hive.metastore.uris</name>

    <value>thrift://hadoop1:9083</value>

    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

  </property>

</configuration>

clip_image028

3.2.2 启动Hive

在使用Spark SQL CLI以前须要启动Hive Metastore(若是数据存放在HDFS文件系统,还须要启动HadoopHDFS),使用以下命令可使Hive Metastore启动后运行在后台,能够经过jobs查询:

$nohup hive --service metastore > metastore.log 2>&1 &

clip_image030

3.2.3 启动Spark集群和Spark SQL CLI

经过以下命令启动Spark集群和Spark SQL CLI

$cd /app/hadoop/spark-1.1.0

$sbin/start-all.sh

$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g

在集群监控页面能够看到启动了SparkSQL应用程序:

clip_image032

这时就可使用HQL语句对Hive数据进行查询,另外可使用COMMAND,如使用set进行设置参数:默认状况下,SparkSQL Shuffle的时候是200partition,可使用以下命令修改该参数:

SET spark.sql.shuffle.partitions=20;

运行同一个查询语句,参数改变后,Taskpartition)的数量就由200变成了20

clip_image034

3.2.4 命令参数

经过bin/spark-sql --help能够查看CLI命令参数:

clip_image036

clip_image038

其中[options] CLI启动一个SparkSQL应用程序的参数,若是不设置--master的话,将在启动spark-sql的机器以local方式运行,只能经过http://机器名:4040进行监控;这部分参数,能够参照Spark1.0.0 应用程序部署工具spark-submit 的参数。

[cli option]CLI的参数,经过这些参数CLI能够直接运行SQL文件、进入命令行运行SQL命令等等,相似之前的Shark的用法。须要注意的是CLI不是使用JDBC链接,因此不能链接到ThriftServer;但能够配置conf/hive-site.xml链接到HiveMetastore,而后对Hive数据进行查询。

3.3 实战Spark SQL CLI

3.3.1 获取订单每一年的销售单数、销售总额

第一步   设置任务个数,在这里修改成20

spark-sql>SET spark.sql.shuffle.partitions=20;

clip_image040

第二步   运行SQL语句

spark-sql>use hive;

clip_image042

spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

clip_image044

第三步   查看运行结果

clip_image046

clip_image048

3.3.2 计算全部订单每一年的总金额

第一步   执行SQL语句

spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

clip_image050

第二步   执行结果

使用CLI执行结果以下:

clip_image052

clip_image054

3.3.3 计算全部订单每一年最大金额订单的销售额

第一步   执行SQL语句

spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;

clip_image056

第二步   执行结果

使用CLI执行结果以下:

clip_image058

 

clip_image060

4Spark Thrift Server

ThriftServer是一个JDBC/ODBC接口,用户能够经过JDBC/ODBC链接ThriftServer来访问SparkSQL的数据。ThriftServer在启动的时候,会启动了一个SparkSQL的应用程序,而经过JDBC/ODBC链接进来的客户端共同分享这个SparkSQL应用程序的资源,也就是说不一样的用户之间能够共享数据;ThriftServer启动时还开启一个侦听器,等待JDBC客户端的链接和提交查询。因此,在配置ThriftServer的时候,至少要配置ThriftServer的主机名和端口,若是要使用Hive数据的话,还要提供Hive Metastoreuris

【注】Spark CLISpark Thrift Server实验环境为第二课《Spark编译与部署(下)--Spark编译安装》所搭建

4.1 配置并启动

4.1.1 建立并配置hive-site.xml

第一步   建立hive-site.xml配置文件

$SPARK_HOME/conf目录下修改hive-site.xml配置文件(若是在Spark SQL CLI中已经添加,能够省略):

$cd /app/hadoop/spark-1.1.0/conf

$sudo vi hive-site.xml

clip_image062

第二步   修改配置文件

设置hadoop1Metastore服务器,hadoop2Thrift Server服务器,配置内容以下:

<configuration>

  <property>

    <name>hive.metastore.uris</name>

    <value>thrift://hadoop1:9083</value>

    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.min.worker.threads</name>

    <value>5</value>

    <description>Minimum number of Thrift worker threads</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.max.worker.threads</name>

    <value>500</value>

    <description>Maximum number of Thrift worker threads</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.port</name>

    <value>10000</value>

    <description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.bind.host</name>

    <value>hadoop2</value>

    <description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description>

  </property>

</configuration>

clip_image064

4.1.2 启动Hive

hadoop1节点中,在后台启动Hive Metastore(若是数据存放在HDFS文件系统,还须要启动HadoopHDFS):

$nohup hive --service metastore > metastore.log 2>&1 &

clip_image066

4.1.3 启动Spark集群和Thrift Server

hadoop1节点启动Spark集群

$cd /app/hadoop/spark-1.1.0/sbin

$./start-all.sh

hadoop2节点上进入SPARK_HOME/sbin目录,使用以下命令启动Thrift Server

$cd /app/hadoop/spark-1.1.0/sbin

$./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g

clip_image068

注意Thrift Server须要按照配置在hadoop2启动!

在集群监控页面能够看到启动了SparkSQL应用程序:

clip_image070

4.1.4 命令参数

使用sbin/start-thriftserver.sh --help能够查看ThriftServer的命令参数:

$sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options]

        Thrift server options: Use value for given property

clip_image072

clip_image074

其中[options] Thrift Server启动一个SparkSQL应用程序的参数,若是不设置--master的话,将在启动Thrift Server的机器以local方式运行,只能经过http://机器名:4040进行监控;这部分参数,能够参照Spark1.0.0 应用程序部署工具spark-submit 的参数。在集群中提供Thrift Server的话,必定要配置masterexecutor-memory等参数。

[thrift server options]Thrift Server的参数,可使用-dproperty=value的格式来定义;在实际应用上,由于参数比较多,一般使用conf/hive-site.xml配置。

4.2 实战Thrift Server

4.2.1 远程客户端链接

能够在任意节点启动bin/beeline,用!connect jdbc:hive2://hadoop2:10000链接ThriftServer,由于没有采用权限管理,因此用户名用运行bin/beeline的用户hadoop,密码为空:

$cd /app/hadoop/spark-1.1.0/bin

$./beeline

beeline>!connect jdbc:hive2://hadoop2:10000

clip_image076

4.2.2 基本操做

第一步   显示hive数据库全部表

beeline>show database;

beeline>use hive;

beeline>show tables;

clip_image078

第二步   建立表testThrift

beeline>create table testThrift(field1 String , field2 Int);

beeline>show tables;

clip_image080

第三步   tbStockDetail表中金额大于3000插入到testThrift表中

beeline>insert into table testThrift select ordernumber,amount from tbStockDetail  where amount>3000;

beeline>select * from testThrift;

clip_image082

第四步   从新建立testThrift表中,把年度最大订单插入该表中

beeline>drop table testThrift;

beeline>create table testThrift (field1 String , field2 Int);

beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;

beeline>select * from testThrift;

clip_image084

4.2.3 计算全部订单每一年的订单数

第一步   执行SQL语句

spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

第二步   执行结果

clip_image086

Stage监控页面:

clip_image088

查看Details for Stage 28

clip_image090

4.2.4 计算全部订单月销售额前十名

第一步   执行SQL语句

spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit 10;

第二步   执行结果

clip_image092

Stage监控页面:

clip_image094

在其第一个Task中,从本地读入数据

clip_image096

在后面的Task是从内存中获取数据

clip_image098

4.2.5 缓存表数据

第一步   缓存数据

beeline>cache table tbStock;

beeline>select count(*) from tbStock;

clip_image100

第二步   运行4.2.4中的“计算全部订单月销售额前十名”

beeline>select count(*) from tbStock;

clip_image102

本次计算划给11.233秒,查看webUI,数据已经缓存,缓存率为100%

clip_image104

第三步   在另外节点再次运行

hadoop3节点启动bin/beeline,用!connect jdbc:hive2://hadoop2:10000链接ThriftServer,而后直接运行对tbStock计数(注意没有进行数据库的切换):

clip_image106

用时0.343秒,再查看webUI中的stage

clip_image108

Locality LevelPROCESS,显然是使用了缓存表。

从上能够看出,ThriftServer能够链接多个JDBC/ODBC客户端,并相互之间能够共享数据。顺便提一句,ThriftServer启动后处于监听状态,用户可使用ctrl+c退出ThriftServer;而beeline的退出使用!q命令。

4.2.6 IDEAJDBC访问

有了ThriftServer,开发人员能够很是方便的使用JDBC/ODBC来访问SparkSQL。下面是一个scala代码,查询表tbStockDetail,返回amount>3000的单据号和交易金额:

第一步   IDEA建立class6包和类JDBCofSparkSQL

参见《Spark编程模型(下)--IDEA搭建及实战》在IDEA中建立class6包并新建类JDBCofSparkSQL。该类中查询tbStockDetail金额大于3000的订单:

package class6

import java.sql.DriverManager

 

object JDBCofSparkSQL {

  def main(args: Array[String]) {

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "")

    try {

      val statement = conn.createStatement

val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail  where amount>3000")

      while (rs.next) {

        val ordernumber = rs.getString("ordernumber")

        val amount = rs.getString("amount")

        println("ordernumber = %s, amount = %s".format(ordernumber, amount))

      }

    } catch {

      case e: Exception => e.printStackTrace

    }

    conn.close

  }

}

第二步   查看运行结果

IDEA中能够观察到,在运行日志窗口中没有运行过程的日志,只显示查询结果

clip_image110

第三步   查看监控结果

Spark监控界面中观察到,该Job有一个编号为6Stage,该Stage2Task,分别运行在hadoop1hadoop2节点,获取数据为NODE_LOCAL方式。

clip_image112

clip_image114

clip_image116

hadoop2中观察Thrift Server运行日志以下:

clip_image118

相关文章
相关标签/搜索