[Spark] 03 - Spark SQL

关于Spark SQL (Structured Query Language),首先会想到一个问题:Apache Hive vs Apache Spark SQL – 13 Amazing Differenceshtml

Hive has been known to be the component of Big data ecosystem where legacy mappers and reducers are needed to process data from HDFS whereas Spark SQL is known to be the component of Apache Spark API which has made processing on Big data ecosystem a lot easier and real-time. java

 

 

原理解析


Ref: https://www.bilibili.com/video/av27076260/?p=9  mysql

Ref: [MySQL] 01- Basic sql sql

 

小比较:数据库

Hive: SQL --> map/reduce,Hive on Spark 就是把map/reduce直接换为Spark。apache

Spark SQL: SQL integrated in Spark;可直接读取Hive的保存的文件,兼容Hive。编程

 

架构图:json

 

转化过程:网络

Catalyst's general tree transformation framework数据结构

(一)SQL Parser 转化为 Abstract Syntax Tree。

(二)逻辑最佳化,只保留须要的部分:

Parquet格式时,将字符串透过字典编码压缩成整数,缩小资料量;

RDBMS时,将筛选推到资料源端。

(三)可执行的物理计划,并产生 JVM bytecode。

智能选择 “Broadcast Join” or "Shuffle Join" 来减小网络流量

低阶优化,减小比较消耗的物件。

 

优化示例:

def add_demographics(events):
  u = sqlCtx.table("users")
  events.join(u, events.user_id) == u.user_id).withColumn("city", zipToCity(df.zip))

events = add_demographics(sqlCtx.load("/data/events", "parquet"))
training_data = events.where(events.city == "New York").select(events.timestamp).collect()

 

抽象语法树:

 

 

 

 

Spark SQL DataFrame


Dataframe做为新的数据结构,能够操做更为细粒度(能看到RDD内部的结构化信息)

Spark SQL编程使用的是:SparkSession 接口,相似以前的SparkContext 的地位。

关系型数据库” 与 "机器学习" 的结合。

 

基本概念

1、SparkSession

PySpark交互环境下,会自动生成 SparkSessionSparkContext

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

 

2、DataFrame

读取文件

/usr/local/spark/examples/src/main/resources/ 文件夹下有实例文件以供实验。

df = spark.read.text("people.txt")
df = spark.read.format("text").load("people.txt")  # 等价的方式
df = spark.read.json(
"people.json") df = spark.read.parquet("people.parquet")

df.show()

从HDFS中读取文件。

from pyspark.sql import SQLContext
sc
= SparkContext() sqlcontext = SQLContext(sc)
#format后面为告诉程序读取csv格式,load后面为hdfs地址,hdfs后面跟着hadoop的名字,而后文件目录(这块有点懵,若是报错,跟着报错查修) data = sqlcontext.read.format("com.databricks.spark.csv").\ options(header="true",inferschema="true").\ load("hdfs://cdsw1.richstone.com/test1/5min.csv") data.show(5) result = data.groupBy("type").count().orderBy("count") result.show()

  

保存文件

注意,最后 newpeople.json 是目录,有点意思。

peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")

peopleDF.select("name", "age").write.format("json").save("file:///usr/local/spark/mycode/sparksql/newpeople.json")

除了如上的json,还支持 text, parquet 格式 的文件。

 

3、经常使用操做

df.printSchema()                                    # 模式信息

df.select(df["name",df["age"]+1]).show()   # age列的value都加1
df.filter(df["age"]>20).show()
df.groupBy("age").count().show()
df.sort(df[
"age"]).desc()).show() df.sort(df["age"]).desc().df["name"].asc()).show()  # 轻松实现 "二次排序"

 

 

 

RDD转换为DataFrame

1、利用"反射"机制推断RDD模式

使用反射来推断包含特定对象类型的RDD的模式(schema)。适用于写spark程序的同时,已经知道了模式使用反射可使得代码简洁。

结合样本的名字,经过反射读取,做为列的名字。

这个RDD能够隐式转化为一个SchemaRDD,而后注册为一个表。表能够在后续的sql语句中使用。

 

第一步、转化为Row形式的RDD格式

from pyspark.sql import Row

people = spark.sparkContext.textFile("...").  \
... map(lambda line: line.split(",")).  \
... map(lambda p: Row(name=p[0], age=int(p[1])))

这里转换为了Row对象。

 

第二步、RDD格式 转换为 DataFrame

这里只是经过DataFrame的sql比较方便的查询了下数据而已。

注意理解:people是rdd,通过一次转变df后,又变回rdd,完成一次 “反射” 过程

schemaPeople = spark.createDataFrame(people) 

# 必须注册为临时表才能供下面的查询使用 schemaPeople.createOrReplaceTempView(
"people") personDF = spark.sql("select name, age from people where age > 20")
# 再转化回RDD形式,而df的一行也就是p,其中包含name, age两个元素 personsRDD
= personsDF.rdd.map(lambda p: "Name: " + p.name + "," + "Age: " + str(p.age)) personsRDD.foreach(print)

Output: 

Name: Michael,Age: 29
Name: Andy,Age: 30

 

 

2、用编程方式去定义RDD模式

当没法提早获知数据结构时。

Jeff: 数据文件中没有说明“数据类型”,只是全都是字符串。而“反射”方案中的rdd是含有类型的。

from pyspark.sql.types import *
from pyspark.sql import Row

# 生成"表头"
schemaString = "name age" fields = [ StructField(field_name, StringType(), True) for field_name in schemaString.split(" ") ]
schema = StructType(fields)

# 生成"记录"
lines  = spark.sparkContext.textFile("file:/// ... people.txt")
parts  = lines.map(lambda x: x.split(","))
people = parts.map(lambda p: Row(p[0], p[1].strip()))

#--------------------------------------------------------------
# 拼接“表头”和“记录” schemaPeople
= spark.createDataFrame(people, schema)

schemaPeople.createOrReplaceTempView("people")
results = spark.sql("SELECT name,age FROM people")
results.show()

 

 

 

链接数据库

1、启动MySQL并建立数据

启动数据库。

service mysql start
mysql -u root -p

建立数据。

create database spark;
use spark;

create table student (id int(4), name char(20), gender char(4), age int(4));

insert into student values(1, 'Xueqian', 'F', 23)
insert into student values(2, 'Weiliang', 'M', 24)

select * from student

 

Spark调用MySQL,需安装JDBC驱动程序:mysql-connector-java-5.1.40.tar.gz

 

2、链接数据库

作个查询,测试一下链接。

>>> use spark;
>>> select * from student;

 

3、插入记录

(1) 设置“表头”

 

(2) 设置“记录”

 

 转变为Row的形式

rowRDD = studentRDD.map(lambda p: Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))

 

(3) 拼接“表头”和“记录”

studentDF = spark.createDataFrame(rowRDD, schema)

 

(4) 写入数据库

DataFrame形式的数据 写入 数据库。

prop = {}
prop['user']     = 'root' prop['password'] = '123456' prop['driver']   = "com.mysql.jdbc.Driver"

# 构建好参数后 studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

 

(5) 查看效果

 

End.

相关文章
相关标签/搜索