Scala是一种多范式的编程语言,其设计的初衷是要集成面向对象编程和函数式编程的各类特性。Scala运行于Java平台(Java虚拟机),并兼容现有的Java程序。它也能运行于CLDC配置的Java ME中。目前还有另外一.NET平台的实现,不过该版本更新有些滞后。Scala的编译模型(独立编译,动态类加载)与Java和C#同样,因此Scala代码能够调用Java类库(对于.NET实现则可调用.NET类库)。Scala包括编译器和类库,以及BSD许可证发布。html
学习Scala编程语言,为后续学习Spark奠基基础。java
l 安装JDKmysql
l 下载Scala:http://www.scala-lang.org/download/es6
l 安装Scala:web
n 设置环境变量:SCALA_HOME和PATH路径算法
l 验证Scala sql
l REPL(Read Evaluate Print Loop):命令行shell
l IDE:图形开发工具数据库
n The Scala IDE (Based on Eclipse):http://scala-ide.org/apache
n IntelliJ IDEA with Scala plugin:http://www.jetbrains.com/idea/download/
n Netbeans IDE with the Scala plugin
注意:在Scala中,任何数据都是对象。例如:
① 数值类型:Byte,Short,Int,Long,Float,Double
l Byte: 8位有符号数字,从-128 到 127
l Short: 16位有符号数据,从-32768 到 32767
l Int: 32位有符号数据
l Long: 64位有符号数据
例如:
val a:Byte = 10
a+10
获得:res9: Int = 20
这里的res9是新生成变量的名字
val b:Short = 20
a+b
注意:在Scala中,定义变量能够不指定类型,由于Scala会进行类型的自动推导。
② 字符类型和字符串类型:Char和String
对于字符串,在Scala中能够进行插值操做。
注意:前面有个s;至关于执行:"My Name is " + s1
③ Unit类型:至关于Java中的void类型
④ Nothing类型:通常表示在执行过程当中,产生了Exception
例如,咱们定义一个函数以下:
l 使用val和var申明变量
例如:scala> val answer = 8 * 3 + 2
能够在后续表达式中使用这些名称
l val:定义的值实际是一个常量
要申明其值可变的变量:var
注意:能够不用显式指定变量的类型,Scala会进行自动的类型推到
l 可使用Scala的预约义函数
例如:求两个值的最大值
l 也可使用def关键字自定义函数
语法:
示例:
Scala的if/else语法结构和Java或C++同样。
不过,在Scala中,if/else是表达式,有值,这个值就是跟在if或else以后的表达式的值。
Scala拥有与Java和C++相同的while和do循环
Scala中,可使用for和foreach进行迭代
注意:
(*) <- 表示Scala中的generator,即:提取符
(*)第三种写法是第二种写法的简写
在上面的案例中,咱们将list集合中的每一个元素转换成了大写,而且使用yield关键字生成了一个新的集合。
注意:在上面的例子中,foreach接收了另外一个函数(println)做为值
l Call By Value:对函数实参求值,且仅求一次
l Call By Name:函数实参每次在函数体内被用到时都会求值
咱们来分析一下,上面两个调用执行的过程:
一份复杂一点的例子:
l 默认参数
l 代名参数
l 可变参数
当val被申明为lazy时,它的初始化将被推迟,直到咱们首次对它取值。
一个更为复杂一点的例子:读取文件:
Scala异常的工做机制和Java或者C++同样。直接使用throw关键字抛出异常。
使用try...catch...finally来捕获和处理异常:
Scala数组的类型:
l 定长数组:使用关键字Array
l 变长数组:使用关键字ArrayBuffer
l 遍历数组
l Scala数组的经常使用操做
l Scala的多维数组
l 和Java同样,多维数组是经过数组的数组来实现的。
l 也能够建立不规则的数组,每一行的长度各不相同。
映射就是Map集合,由一个(key,value)组成。
-> 操做符用来建立
例如:
val scores = Map(“Alice” -> 10,”Bob” -> 3,”Cindy” -> 8)
映射的类型分为:不可变Map和可变Map
映射的操做
l 获取映射中的值
l 更新映射中的值(必须是可变Map)
l 迭代映射
元组是不一样类型的值的汇集。
例如:val t = (1, 3.14, "Fred") // 类型为Tuple3[Int, Double, java.lang.String]
这里:Tuple是类型,3是表示元组中有三个元素。
元组的访问和遍历:
注意:要遍历Tuple中的元素,须要首先生成对应的迭代器。不能直接使用for或者foreach。
把数据及对数据的操做方法放在一块儿,做为一个相互依存的总体——对象
面向对象的三大特征:
u 封装
u 继承
u 多态
简单类和无参方法:
案例:注意没有class前面没有public关键字修饰。
若是要开发main方法,须要将main方法定义在该类的伴生对象中,即:object对象中,(后续作详细的讨论)。
l 当定义属性是private时候,scala会自动为其生成对应的get和set方法
private var stuName:String = "Tom"
l 定义属性:private var money:Int = 1000 但愿money只有get方法,没有set方法??
l private[this]的用法:该属性只属于该对象私有,就不会生成对应的set和get方法。若是这样,就不能直接调用,例如:s1.stuName ---> 错误
咱们能够在一个类的内部在定义一个类,以下:咱们在Student类中,再定义了一个Course类用于保存学生选修的课程。
开发一个测试程序进行测试:
类的构造器分为:主构造器、辅助构造器
l 主构造器:和类的声明结合在一块儿,只能有一个主构造器
Student4(val stuName:String,val stuAge:Int)
(1) 定义类的主构造器:两个参数
(2) 声明了两个属性:stuName和stuAge 和对应的get和set方法
l 辅助构造器:能够有多个辅助构造器,经过关键字this来实现
Scala没有静态的修饰符,但Object对象下的成员都是静态的 ,如有同名的class,这其做为它的伴生类。在Object中通常能够为伴生类作一些初始化等操做。
下面是Java中的静态块的例子。在这个例子中,咱们对JDBC进行了初始化。
而Scala中的Object就至关于Java中静态块。
Object对象的应用
u 单例对象
u 使用应用程序对象:能够省略main方法;须要从父类App继承。
遇到以下形式的表达式时,apply方法就会被调用:
Object(参数1,参数2,......,参数N)
一般,这样一个apply方法返回的是伴生类的对象;其做用是为了省略new关键字
Object的apply方法举例:
Scala和Java同样,使用extends关键字扩展类。
l 案例一:Employee类继承Person类
l 案例二:在子类中重写父类的方法
l 案例三:使用匿名子类
l 案例四:使用抽象类。抽象类中包含抽象方法,抽象类只能用来继承。
l 案例五:使用抽象字段。抽象字段就是一个没有初始值的字段
trait就是抽象类。trait跟抽象类最大的区别:trait支持多重继承
十、包的使用
Scala的包和Java中的包或者C++中的命名空间的目的是相同的:管理大型程序中的名称。
Scala中包的定义和使用:
u 包的定义
u 包的引入:Scala中依然使用import做为引用包的关键字,例如
u 并且Scala中的import能够写在任意地方(Java中,import写在最前面)
包能够包含类、对象和特质,但不能包含函数或者变量的定义。很不幸,这是Java虚拟机的局限。
把工具函数或者常量添加到包而不是某个Utils对象,这是更加合理的作法。Scala中,包对象的出现正是为了解决这个局限。
Scala中的包对象:常量,变量,方法,类,对象,trait(特质),包
l 读取行
l 读取字符
其实这里的source就指向了这个文件中的每一个字符。
l 从URL或其余源读取:注意指定字符集UTF-8
l 读取二进制文件:Scala中并不支持直接读取二进制,但能够经过调用Java的InputStream来进行读入。
l 写入文本文件
在Scala中,函数是“头等公民”,就和数字同样。能够在变量中存放函数,即:将函数做为变量的值(值函数)。
示例1:
(*)首先,定义一个最普通的函数
(*)再定义一个高阶函数
(*)分析这个高阶函数调用的过程
示例2:
在这个例子中,首先定义了一个普通的函数mytest,而后定义了一个高阶函数myFunction;myFunction接收三个参数:第一个f是一个函数参数,第二个是x,第三个是y。而f是一个函数参数,自己接收两个Int的参数,返回一个Int的值。
就是函数的嵌套,即:在一个函数定义中,包含另一个函数的定义;而且在内函数中能够访问外函数中的变量。
测试上面的函数:
柯里化函数(Curried Function)是把具备多个参数的函数转换为一条函数链,每一个节点上是单一参数。
一个简单的例子:
示例1:
示例2:
示例3:
示例4:
示例5:
在这个例子中,能够被2整除的被分到一个分区;不能被2整除的被分到另外一个分区。
示例6:
示例7:
示例8:
在这个例子中,分为两步:
(1)将(1,2,3)和(4,5,6)这两个集合合并成一个集合
(2)再对每一个元素乘以2
l 可变集合
l 不可变集合:
n 集合从不改变,所以能够安全地共享其引用。
n 甚至是在一个多线程的应用程序当中也没问题。
集合的操做:
l 不可变列表(List)
不可变列表的相关操做:
l 可变列表(LinkedList):scala.collection.mutable
经常使用的序列有:Vector和Range
u Vector是ArrayBuffer的不可变版本,是一个带下标的序列
u Range表示一个整数序列
l 集Set是不重复元素的集合
l 和列表不一样,集并不保留元素插入的顺序。默认以Hash集实现
示例1:建立集
示例2:集的操做
Scala有一个强大的模式匹配机制,能够应用在不少场合:
u switch语句
u 类型检查
Scala还提供了样本类(case class),对模式匹配进行了优化
模式匹配示例:
l 更好的switch
l Scala的守卫
l 模式匹配中的变量
l 类型模式
l 匹配数组和列表
简单的来讲,Scala的case class就是在普通的类定义前加case这个关键字,而后你能够对这些类来模式匹配。
case class带来的最大的好处是它们支持模式识别。
首先,回顾一下前面的模式匹配:
其次,若是咱们想判断一个对象是不是某个类的对象,跟Java同样可使用isInstanceOf
下面这个好像有点问题
最后,在Scala中有一种更简单的方式来判断,就是case class
注意:须要在class前面使用case关键字。
和Java或者C++同样,类和特质能够带类型参数。在Scala中,使用方括号来定义类型参数
测试程序:
函数和方法也能够带类型参数。和泛型类同样,咱们须要把类型参数放在方法名以后。
注意:这里的ClassTag是必须的,表示运行时的一些信息,好比类型。
类型的上界和下界,是用来定义类型变量的范围。它们的含义以下:
l S <: T
这是类型上界的定义。也就是S必须是类型T的子类(或自己,本身也能够认为是本身的子类。
u U >: T
这是类型下界的定义。也就是U必须是类型T的父类(或自己,本身也能够认为是本身的父类)。
l 一个简单的例子:
l 一个复杂一点的例子(上界):
l 再来看一个例子:
它比 <: 适用的范围更广,除了全部的子类型,还容许隐式转换过去的类型。用 <% 表示。尽可能使用视图界定,来取代泛型的上界,由于适用的范围更加普遍。
示例:
l 上面写过的一个列子。这里因为T的上界是String,当咱们传递100和200的时候,就会出现类型不匹配。
l 可是100和200是能够转成字符串的,因此咱们可使用视图界定让addTwoString方法接收更普遍的数据类型,即:字符串及其子类、能够转换成字符串的类型。
注意:使用的是 <%
l 但实际运行的时候,会出现错误:
这是由于:Scala并无定义如何将Int转换成String的规则,因此要使用视图界定,咱们就必须建立转换的规则。
l 建立转换规则
l 运行成功
l 协变:
Scala的类或特征的范型定义中,若是在类型参数前面加入+符号,就可使类或特征变为协变了。
u 逆变:
在类或特征的定义中,在类型参数以前加上一个-符号,就可定义逆变范型类和特征了。
总结一下:Scala的协变:泛型变量的值能够是自己类型或者其子类的类型
Scala的逆变:泛型变量的值能够是自己类型或者其父类的类型
所谓隐式转换函数指的是以implicit关键字申明的带有单个参数的函数。
l 前面讲视图界定时候的一个例子:
l 再举一个例子:咱们把Fruit对象转换成了Monkey对象
使用implicit申明的函数参数叫作隐式参数。咱们也可使用隐式参数实现隐式的转换
所谓隐式类: 就是对类增长implicit 限定的类,其做用主要是对类的功能增强!
翻译:Spark是一个针对大规模数据处理的快速通用引擎。
Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提升了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,容许用户将Spark部署在大量廉价硬件之上,造成集群。Spark获得了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了不少生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。
(*)Hadoop的MapReduce计算模型存在的问题:
学习过Hadoop的MapReduce的学员都知道,MapReduce的核心是Shuffle(洗牌)。在整个Shuffle的过程当中,至少会产生6次的I/O。下图是咱们在讲MapReduce的时候,画的Shuffle的过程。
中间结果输出:基于MapReduce的计算引擎一般会将中间结果输出到磁盘上,进行存储和容错。另外,当一些查询(如:Hive)翻译到MapReduce任务时,每每会产生多个Stage(阶段),而这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每个Stage的输出结果,而I/O的效率每每较低,从而影响了MapReduce的运行速度。
(*)Spark的最大特色:基于内存
Spark是MapReduce的替代方案,并且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。
(*)快
与Hadoop的MapReduce相比,Spark基于内存的运算速度要快100倍以上,即便,Spark基于硬盘的运算也要快10倍。Spark实现了高效的DAG执行引擎,从而能够经过内存来高效处理数据流。
(*)易用
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户能够快速构建不一样的应用。并且Spark支持交互式的Python和Scala的shell,能够很是方便地在这些shell中使用Spark集群来验证解决问题的方法。
(*)通用
Spark提供了统一的解决方案。Spark能够用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不一样类型的处理均可以在同一个应用中无缝使用。Spark统一的解决方案很是具备吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减小开发和维护的人力成本和部署平台的物力成本。
另外Spark还能够很好的融入Hadoop的体系结构中能够直接操做HDFS,并提供Hive on Spark、Pig on Spark的框架集成Hadoop。
(*)兼容性
Spark能够很是方便地与其余的开源产品进行融合。好比,Spark可使用Hadoop的YARN和Apache Mesos做为它的资源管理和调度器,器,而且能够处理全部Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,由于不须要作任何数据迁移就可使用Spark的强大处理能力。Spark也能够不依赖于第三方的资源管理和调度器,它实现了Standalone做为其内置的资源管理和调度框架,这样进一步下降了Spark的使用门槛,使得全部人均可以很是容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。
官方的一张图:
本身的一张图:
Spark的安装部署方式有如下几种模式:
l Standalone
l YARN
l Mesos
l Amazon EC2
(*)Spark Standalone伪分布的部署
l 配置文件:conf/spark-env.sh
n export JAVA_HOME=/root/training/jdk1.7.0_75
n export SPARK_MASTER_HOST=spark81
n export SPARK_MASTER_PORT=7077
n 下面的能够不写,默认
n export SPARK_WORKER_CORES=1
n export SPARK_WORKER_MEMORY=1024m
l 配置文件:conf/slave
n spark81
(*)Spark Standalone全分布的部署
l 配置文件:conf/spark-env.sh
n export JAVA_HOME=/root/training/jdk1.7.0_75
n export SPARK_MASTER_HOST=spark82
n export SPARK_MASTER_PORT=7077
n 下面的能够不写,默认
n export SPARK_WORKER_CORES=1
n export SPARK_WORKER_MEMORY=1024m
l 配置文件:conf/slave
n spark83
n spark84
(*)启动Spark集群:start-all.sh
(*)基于文件系统的单点恢复
主要用于开发或测试环境。当spark提供目录保存spark Application和worker的注册信息,并将他们的恢复状态写入该目录中,这时,一旦Master发生故障,就能够经过从新启动Master进程(sbin/start-master.sh),恢复已运行的spark Application和worker的注册信息。
基于文件系统的单点恢复,主要是在spark-en.sh里对SPARK_DAEMON_JAVA_OPTS设置
配置参数 |
参考值 |
spark.deploy.recoveryMode |
设置为FILESYSTEM开启单点恢复功能,默认值:NONE |
spark.deploy.recoveryDirectory |
Spark 保存恢复状态的目录 |
参考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"
测试:
一、在spark82上启动Spark集群
二、在spark83上启动spark shell
MASTER=spark://spark82:7077 spark-shell
三、在spark82上中止master
stop-master.sh
四、观察spark83上的输出:
五、在spark82上重启master
start-master.sh
(*)基于Zookeeper的Standby Masters
ZooKeeper提供了一个Leader Election机制,利用这个机制能够保证虽然集群存在多个Master,可是只有一个是Active的,其余的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。因为集群的信息,包括Worker, Driver和Application的信息都已经持久化到ZooKeeper,所以在切换的过程当中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群总体架构以下图所示。
配置参数 |
参考值 |
spark.deploy.recoveryMode |
设置为ZOOKEEPER开启单点恢复功能,默认值:NONE |
spark.deploy.zookeeper.url |
ZooKeeper集群的地址 |
spark.deploy.zookeeper.dir |
Spark信息在ZK中的保存目录,默认:/spark |
l 参考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 -Dspark.deploy.zookeeper.dir=/spark"
l 另外:每一个节点上,须要将如下两行注释掉。
l ZooKeeper中保存的信息
(*)示例程序:$SPARK_HOME/examples/jars/spark-examples_2.11-2.1.0.jar
(*)全部的示例程序:$EXAMPLE_HOME/examples/src/main有Java、Scala等等
(*)Demo:蒙特卡罗求PI
命令:
spark-submit --master spark://spark81:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100
spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户能够在该命令行下用scala编写spark程序。
(*)启动Spark Shell:spark-shell
也可使用如下参数:
参数说明:
--master spark://spark81:7077 指定Master的地址
--executor-memory 2g 指定每一个worker可用内存为2G
--total-executor-cores 2 指定整个集群使用的cup核数为2个
例如:
spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2
(*)注意:
若是启动spark shell时没有指定master地址,可是也能够正常启动spark shell和执行spark shell中的程序,实际上是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群创建联系。
请注意local模式和集群模式的日志区别:
(*)在Spark Shell中编写WordCount程序
程序以下:
sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")
说明:
l sc是SparkContext对象,该对象时提交spark程序的入口
l textFile("hdfs://192.168.88.111:9000/data/data.txt")是hdfs中读取数据
l flatMap(_.split(" "))先map在压平
l map((_,1))将单词和1构成元组
l reduceByKey(_+_)按照key进行reduce,并将value累加
l saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")将结果写入到hdfs中
(*)须要的jar包:$SPARK_HOME/jars/*.jar
(*)建立Scala Project,并建立Scala Object、或者Java Class
(*)书写源代码,并打成jar包,上传到Linux
==========================Scala版本==========================
(*)运行程序:
spark-submit --master spark://spark81:7077 --class mydemo.WordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt hdfs://192.168.88.111:9000/output/spark/wc1
====================Java版本(直接输出在屏幕)====================
(*)运行程序:
spark-submit --master spark://spark81:7077 --class mydemo.JavaWordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt
====================Python版本(直接输出在屏幕)====================
(*)运行程序:
等等
须要看源码一步步看。
RDD(Resilient Distributed Dataset)叫作弹性分布式数据集,是Spark中最基本的数据抽象,它表明一个不可变、可分区、里面的元素可并行计算的集合。RDD具备数据流模型的特色:自动容错、位置感知性调度和可伸缩性。RDD容许用户在执行多个查询时显式地将工做集缓存在内存中,后续的查询可以重用工做集,这极大地提高了查询速度。
² 一组分片(Partition),即数据集的基本组成单位。对于RDD来讲,每一个分片都会被一个计算任务处理,并决定并行计算的粒度。用户能够在建立RDD时指定RDD的分片个数,若是没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
² 一个计算每一个分区的函数。Spark中RDD的计算是以分片为单位的,每一个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不须要保存每次计算的结果。
² RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,因此RDD之间就会造成相似于流水线同样的先后依赖关系。在部分分区数据丢失时,Spark能够经过这个依赖关系从新计算丢失的分区数据,而不是对RDD的全部分区进行从新计算。
² 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD自己的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
² 一个列表,存储存取每一个Partition的优先位置(preferred location)。对于一个HDFS文件来讲,这个列表保存的就是每一个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽量地将计算任务分配到其所要处理数据块的存储位置。
u RDD的建立方式
val rdd1 = sc.textFile(“hdfs://192.168.88.111:9000/data/data.txt”)
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
u RDD的基本原理
RDD中的全部转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动做。只有当发生一个要求返回结果给Driver的动做时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
转换 |
含义 |
map(func) |
返回一个新的RDD,该RDD由每个输入元素通过func函数转换后组成 |
filter(func) |
返回一个新的RDD,该RDD由通过func函数计算后返回值为true的输入元素组成 |
flatMap(func) |
相似于map,可是每个输入元素能够被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素) |
mapPartitions(func) |
相似于map,但独立地在RDD的每个分片上运行,所以在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
相似于mapPartitions,但func带有一个整数参数表示分片的索引值,所以在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) |
根据fraction指定的比例对数据进行采样,能够选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) |
对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) |
对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) |
对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) |
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一块儿,与groupByKey相似,reduce任务的个数能够经过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) |
|
sortByKey([ascending], [numTasks]) |
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) |
与sortByKey相似,可是更灵活 |
join(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的全部元素对在一块儿的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD |
cartesian(otherDataset) |
笛卡尔积 |
pipe(command, [envVars]) |
|
coalesce(numPartitions) |
|
repartition(numPartitions) |
|
repartitionAndSortWithinPartitions(partitioner) |
|
动做 |
含义 |
reduce(func) |
经过func函数汇集RDD中的全部元素,这个功能必须是课交换且可并联的 |
collect() |
在驱动程序中,以数组的形式返回数据集的全部元素 |
count() |
返回RDD的元素个数 |
first() |
返回RDD的第一个元素(相似于take(1)) |
take(n) |
返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) |
返回一个数组,该数组由从数据集中随机采样的num个元素组成,能够选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) |
|
saveAsTextFile(path) |
将数据集的元素以textfile的形式保存到HDFS文件系统或者其余支持的文件系统,对于每一个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) |
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可使HDFS或者其余Hadoop支持的文件系统。 |
saveAsObjectFile(path) |
|
countByKey() |
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每个key对应的元素个数。 |
foreach(func) |
在数据集的每个元素上,运行函数func进行更新。 |
Spark算子示例:
一、RDD的建立方式
经过外部的数据文件建立,如HDFS
val rdd1 = sc.textFile(“hdfs://192.168.88.111:9000/data/data.txt”)
经过sc.parallelize进行建立
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
二、RDD的算子
(1)Transformation算子:
(*)map(func)算子: 将输入的每一个元素重写组合成一个元组
val rdd2 = rdd1.map((_,"*"))
乘以10
val rdd2 = rdd1.map((_ * 10))
val rdd2 = rdd1.map((x:Int) = x + 10)
(*)filter(func):返回一个新的RDD,该RDD是通过func运算后返回true的元素
val rdd3 = rdd1.filter(_ > 5)
(*)flatMap(func) 压平操做
val books = sc.parallelize(List("Hadoop","Hive","HDFS"))
books.flatMap(_.toList).collect
结果:res18: Array[Char] = Array(H, a, d, o, o, p, H, i, v, e, H, D, F, S)
val sen = sc.parallelize(List("I love Beijing","I love China","Beijing is the capital of China"))
(*)union(otherDataset):并集运算,注意类型要一致
val rdd4 = sc.parallelize(List(5,6,4,7))
val rdd5 = sc.parallelize(List(1,2,3,4))
val rdd6 = rdd4.union(rdd5)
(*)intersection(otherDataset):交集
val rdd7 = rdd5.intersection(rdd4)
(*)distinct([numTasks])):去掉重复数据
val rdd8 = sc.parallelize(List(5,6,4,7,5,5,5))
rdd8.distinct.collect
(*)groupByKey([numTasks]) :对于一个<k,v>的RDD,按照Key进行分组
val rdd = sc.parallelize(Array(("I",1),("love",2),("I",3)))
rdd.groupByKey.collect
结果:res38: Array[(String, Iterable[Int])] = Array((love,CompactBuffer(2)), (I,CompactBuffer(1, 3)))
复杂一点的例子:
val sen = sc.parallelize(List("I love Beijing","I love China","Beijing is the capital of China"))
sen.flatMap(_.split(" ")).map((_,1)).groupByKey.collect
(*)reduceByKey(func, [numTasks]):相似于groupByKey,区别是reduceByKey会有一个combiner的过程对每一个分区上的数据先作一次合并
画图说明,因此效率更高
(*)cartesian笛卡尔积
val rdd1 = sc.parallelize(List("tom", "jerry"))
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
val rdd3 = rdd1.cartesian(rdd2)
(2)Action算子:
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
(*)collect
rdd1.collect
(*)reduce
val rdd2 = rdd1.reduce(_+_)
(*)count
rdd1.count
(*)top
rdd1.top(2)
(*)take
rdd1.take(2)
(*)first(similer to take(1))
rdd1.first
(*)takeOrdered
rdd1.takeOrdered(3)
RDD经过persist方法或cache方法能够将前面的计算结果缓存,可是并非这两个方法被调用时当即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
经过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
缓存有可能丢失,或者存储存储于内存的数据因为内存不足而被删除,RDD的缓存容错机制保证了即便缓存丢失也能保证计算的正确执行。经过基于RDD的一系列转换,丢失的数据会被重算,因为RDD的各个Partition是相对独立的,所以只须要计算丢失的部分便可,并不须要重算所有Partition。
l Demo示例:
l 经过UI进行监控:
检查点(本质是经过将RDD写入Disk作检查点)是为了经过lineage(血统)作容错的辅助,lineage过长会形成容错成本太高,这样就不如在中间阶段作检查点容错,若是以后有节点出现问题而丢失分区,从作检查点的RDD开始重作Lineage,就会减小开销。
设置checkpoint的目录,能够是本地的文件夹、也能够是HDFS。通常是在具备容错能力,高可靠的文件系统上(好比HDFS, S3等)设置一个检查点路径,用于保存检查点数据。
分别举例说明:
l 本地目录
注意:这种模式,须要将spark-shell运行在本地模式上
l HDFS的目录
注意:这种模式,须要将spark-shell运行在集群模式上
l 源码中的一段话
l RDD的依赖关系
RDD和它依赖的父RDD(s)的关系有两种不一样的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
总结:窄依赖咱们形象的比喻为独生子女
总结:窄依赖咱们形象的比喻为超生
l Spark任务中的Stage
DAG(Directed Acyclic Graph)叫作有向无环图,原始的RDD经过一系列的转换就就造成了DAG,根据RDD之间的依赖关系的不一样将DAG划分红不一样的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,因为有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,所以宽依赖是划分Stage的依据。
//经过并行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//对rdd1里的每个元素乘2而后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//过滤出大于等于十的元素
val rdd3 = rdd2.filter(_ >= 10)
//将元素以数组的方式在客户端显示
rdd3.collect
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//将rdd1里面的每个元素先切分在压平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
rdd4.groupByKey
rdd4.collect
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup与groupByKey的区别
rdd3.collect
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
把每一个partition中的分区号和对应的值拿出来
u 接收一个函数参数:
l 第一个参数:分区号
l 第二个参数:分区中的元素
u 示例:将每一个分区中的元素和分区号打印出来。
l val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
l 建立一个函数返回RDD中的每一个分区号和元素:
def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
}
l 调用:rdd1.mapPartitionsWithIndex(func1).collect
先对局部聚合,再对全局聚合
示例:val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
u 查看每一个分区中的元素:
u 将每一个分区中的最大值求和,注意:初始值是0;
若是初始值时候10,则结果为:30
u 若是是求和,注意:初始值是0:
若是初始值是10,则结果是:45
u 一个字符串的例子:
val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
修改一下刚才的查看分区元素的函数
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
两个分区中的元素:
[partID:0, val: a], [partID:0, val: b], [partID:0, val: c],
[partID:1, val: d], [partID:1, val: e], [partID:1, val: f]
运行结果:
u 更复杂一点的例子
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
结果多是:”24”,也多是:”42”
val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
结果是:”10”,也多是”01”,
缘由:注意有个初始值””,其长度0,而后0.toString变成字符串
val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
结果是:”11”,缘由同上。
n 准备数据:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
n 两个分区中的元素:
n 示例:
l 将每一个分区中的动物最多的个数求和
scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
l 将每种动物个数求和
scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect
res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
这个例子也可使用:reduceByKey
scala> pairRDD.reduceByKey(_+_).collect
res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
四、coalesce与repartition
n 都是将RDD中的分区进行重分区。
n 区别是:coalesce默认不会进行shuffle(false);而repartition会进行shuffle(true),即:会将数据真正经过网络进行重分区。
n 示例:
def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
下面两句话是等价的:
val rdd2 = rdd1.repartition(3)
val rdd3 = rdd1.coalesce(3,true) --->若是是false,查看RDD的length依然是2
参考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
l Tomcat的访问日志
l 求出访问量最高的两个网页
n 要求显示:网页名称、访问量
l 根据jsp文件的名字,将各自的访问日志放入到不一样的分区文件中,以下:
n 生成的分区文件
n 例如:part-00000文件中的内容:只包含了web.jsp的访问日志
l 将RDD的数据保存到Oracle数据库中
调用:
l 使用JdbcRDD:执行SQL语句
JdbcRDD参数说明:
参数名称 |
类型 |
说明 |
sc |
org.apache.spark.SparkContext |
Spark Context对象 |
getConnection |
scala.Function0[java.sql.Connection] |
获得一个数据库Connection |
sql |
scala.Predef.String |
执行的SQL语句 |
lowerBound |
scala.Long |
下边界值,即:SQL的第一个参数 |
upperBound |
scala.Long |
上边界值,即:SQL的第二个参数 |
numPartitions |
scala.Int |
分区的个数,即:启动多少个Executor |
mapRow |
scala.Function1[java.sql.ResultSet, T] |
获得的结果集 |
JdbcRDD的缺点:从上面的参数说明能够看出,JdbcRDD有如下两个缺点:
1.执行的SQL必须有两个参数,并类型都是Long
2.获得的结果是ResultSet,即:只支持select操做
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫作DataFrame而且做为分布式SQL查询引擎的做用。
为何要学习Spark SQL?咱们已经学习了Hive,它是将Hive SQL转换成MapReduce而后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,因为MapReduce这种计算模型执行效率比较慢。因此Spark SQL的应运而生,它是将Spark SQL转换成RDD,而后提交到集群执行,执行效率很是快!同时Spark SQL也支持从Hive中读取数据。
Spark SQL的特色:
l 容易整合(集成)
l 统一的数据访问方式
l 兼容Hive
l 标准的数据链接
u DataFrame
DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具备更丰富的优化。DataFrames能够从各类来源构建,
例如:
l 结构化数据文件
l hive中的表
l 外部数据库或现有RDDs
DataFrame API支持的语言有Scala,Java,Python和R。
从上图能够看出,DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子之外,更重要的特色是提高执行效率、减小数据读取以及执行计划的优化
u Datasets
Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优势(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优势。一个Dataset 能够从JVM对象构造,而后使用函数转换(map, flatMap,filter等)去操做。 Dataset API 支持Scala和Java。 Python不支持Dataset API。
使用员工表的数据,并已经将其保存到了HDFS上。
(*)经过Case Class建立DataFrames
① 定义case class(至关于表的结构:Schema)
注意:因为mgr和comm列中包含null值,简单起见,将对应的case class类型定义为String
② 将HDFS上的数据读入RDD,并将RDD与case Class关联
③ 将RDD转换成DataFrames
④ 经过DataFrames查询数据
(*)使用SparkSession
① 什么是SparkSession
Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,而且容许用户经过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减小了用户须要了解的一些概念,使得咱们能够很容易地与Spark交互。
在2.0版本以前,与Spark交互以前必须先建立SparkConf和SparkContext。然而在Spark 2.0中,咱们能够经过SparkSession来实现一样的功能,而不须要显式地建立SparkConf, SparkContext 以及 SQLContext,由于这些对象已经封装在SparkSession中。
② 建立StructType,来定义Schema结构信息
注意,须要:import org.apache.spark.sql.types._
③ 读入数据而且切分数据
④ 将RDD中的数据映射成Row
注意,须要:import org.apache.spark.sql.Row
⑤ 建立DataFrames
val df = spark.createDataFrame(rowRDD,myschema)
再举一个例子,使用JSon文件来建立DataFame
① 源文件:$SPARK_HOME/examples/src/main/resources/people.json
② val df = spark.read.json("源文件")
③ 查看数据和Schema信息
DataFrame操做也称为无类型的Dataset操做
(*)查询全部的员工姓名
(*)查询全部的员工姓名和薪水,并给薪水加100块钱
(*)查询工资大于2000的员工
(*)求每一个部门的员工人数
完整的例子,请参考:
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset
(*)在DataFrame中使用SQL语句
① 将DataFrame注册成表(视图):df.createOrReplaceTempView("emp")
② 执行查询:spark.sql("select * from emp").show
spark.sql("select * from emp where deptno=10").show
spark.sql("select deptno,sum(sal) from emp group by deptno").show
上面使用的是一个在Session生命周期中的临时views。在Spark SQL中,若是你想拥有一个临时的view,并想在不一样的Session中共享,并且在application的运行周期内可用,那么就须要建立一个全局的临时view。并记得使用的时候加上global_temp做为前缀来引用它,由于全局的临时view是绑定到系统保留的数据库global_temp上。
① 建立一个普通的view和一个全局的view
df.createOrReplaceTempView("emp1")
df.createGlobalTempView("emp2")
② 在当前会话中执行查询,都可查询出结果。
spark.sql("select * from emp1").show
spark.sql("select * from global_temp.emp2").show
③ 开启一个新的会话,执行一样的查询
spark.newSession.sql("select * from emp1").show (运行出错)
spark.newSession.sql("select * from global_temp.emp2").show
DataFrame的引入,可让Spark更好的处理结构数据的计算,但其中一个主要的问题是:缺少编译时类型安全。为了解决这个问题,Spark采用新的Dataset API (DataFrame API的类型扩展)。
Dataset是一个分布式的数据收集器。这是在Spark1.6以后新加的一个接口,兼顾了RDD的优势(强类型,可使用功能强大的lambda)以及Spark SQL的执行器高效性的优势。因此能够把DataFrames当作是一种特殊的Datasets,即:Dataset(Row)
(*)建立DataSet,方式一:使用序列
一、定义case class
case class MyData(a:Int,b:String)
二、生成序列,并建立DataSet
val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
三、查看结果
(*)建立DataSet,方式二:使用JSON数据
一、定义case class
case class Person(name: String, gender: String)
二、经过JSON数据生成DataFrame
val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))
三、将DataFrame转成DataSet
df.as[Person].show
df.as[Person].collect
(*)建立DataSet,方式三:使用HDFS数据
一、读取HDFS数据,并建立DataSet
val linesDS = spark.read.text("hdfs://hadoop111:9000/data/data.txt").as[String]
二、对DataSet进行操做:分词后,查询长度大于3的单词
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
三、执行WordCount程序
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count
result.show
排序:result.orderBy($"value").show
(*)使用emp.json 生成DataFrame
val empDF = spark.read.json("/root/resources/emp.json")
查询工资大于3000的员工
empDF.where($"sal" >= 3000).show
(*)建立case class
case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)
(*)生成DataSets,并查询数据
val empDS = empDF.as[Emp]
查询工资大于3000的员工
empDS.filter(_.sal > 3000).show
查看10号部门的员工
empDS.filter(_.deptno == 10).show
(*)多表查询
一、建立部门表
val deptRDD=sc.textFile("/root/temp/dept.csv").map(_.split(","))
case class Dept(deptno:Int,dname:String,loc:String)
val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
二、建立员工表
case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
val empRDD = sc.textFile("/root/temp/emp.csv").map(_.split(","))
val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS
三、执行多表查询:等值连接
val result = deptDS.join(empDS,"deptno")
另外一种写法:注意有三个等号
val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))
joinWith和join的区别是链接后的新Dataset的schema会不同
(*)查看执行计划:result.explain
(*)什么是parquet文件?
Parquet是列式存储格式的一种文件类型,列式存储有如下的核心:
l 能够跳过不符合条件的数据,只读取须要的数据,下降IO数据量。
l 压缩编码能够下降磁盘存储空间。因为同一列的数据类型是同样的,可使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
l 只读取须要的列,支持向量运算,可以获取更好的扫描性能。
l Parquet格式是Spark SQL的默认数据源,可经过spark.sql.sources.default配置
(*)通用的Load/Save函数
l 读取Parquet文件
val usersDF = spark.read.load("/root/resources/users.parquet")
l 查询Schema和数据
l 查询用户的name和喜好颜色,并保存
usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")
l 验证结果
(*)显式指定文件格式:加载json格式
l 直接加载:val usersDF = spark.read.load("/root/resources/people.json") 会出错
l val usersDF = spark.read.format("json").load("/root/resources/people.json")
(*)存储模式(Save Modes)
能够采用SaveMode执行存储操做,SaveMode定义了对数据的处理模式。须要注意的是,这些保存模式不使用任何锁定,不是原子操做。此外,当使用Overwrite方式执行时,在输出新数据以前原数据就已经被删除。SaveMode详细介绍以下表:
Demo:
l usersDF.select($"name").write.save("/root/result/parquet1")
--> 出错:由于/root/result/parquet1已经存在
l usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")
(*)将结果保存为表
l usersDF.select($"name").write.saveAsTable("table1")
也能够进行分区、分桶等操做:partitionBy、bucketBy
Parquet是一个列格式并且用于多个数据处理系统中。Spark SQL提供支持对于Parquet文件的读写,也就是自动保存原始数据的schema。当写Parquet文件时,全部的列被自动转化为nullable,由于兼容性的缘故。
(*)案例:
读入json格式的数据,将其转换成parquet格式,并建立相应的表来使用SQL进行查询。
(*)Schema的合并:
Parquet支持Schema evolution(Schema演变,即:合并)。用户能够先定义一个简单的Schema,而后逐渐的向Schema中增长列描述。经过这种方式,用户能够获取多个有不一样Schema但相互兼容的Parquet文件。
Demo:
Schema的合并
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("/root/myresult/test_table/key=1")
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("/root/myresult/test_table/key=2")
val df3 = spark.read.option("mergeSchema", "true").parquet("/root/myresult/test_table/")
df3.printSchema()
Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。
须要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自知足有效的JSON对象。若是用多行描述一个JSON对象,会致使读取出错。读取JSON数据集示例以下:
(*)Demo1:使用Spark自带的示例文件 --> people.json 文件
定义路径:
val path ="/root/resources/people.json"
读取Json文件,生成DataFrame:
val peopleDF = spark.read.json(path)
打印Schema结构信息:
peopleDF.printSchema()
建立临时视图:
peopleDF.createOrReplaceTempView("people")
执行查询
spark.sql("SELECT name FROM people WHERE age=19").show
Spark SQL一样支持经过JDBC读取其余数据库的数据做为数据源。
Demo演示:使用Spark SQL读取Oracle数据库中的表。
l 启动Spark Shell的时候,指定Oracle数据库的驱动
spark-shell --master spark://spark81:7077 \\
--jars /root/temp/ojdbc6.jar \\
--driver-class-path /root/temp/ojdbc6.jar
l 读取Oracle数据库中的数据
(*)方式一:
val oracleDF = spark.read.format("jdbc").
option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com").
option("dbtable","scott.emp").
option("user","scott").
option("password","tiger").
load
(*)方式二:
导入须要的类:
import java.util.Properties
定义属性:
val oracleprops = new Properties()
oracleprops.setProperty("user","scott")
oracleprops.setProperty("password","tiger")
读取数据:
val oracleEmpDF =spark.read.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com",
"scott.emp",oracleprops)
注意:下面是读取Oracle 10g(Windows上)的步骤
l 首先,搭建好Hive的环境(须要Hadoop)
l 配置Spark SQL支持Hive
n 只须要将如下文件拷贝到$SPARK_HOME/conf的目录下,便可
u $HIVE_HOME/conf/hive-site.xml
u $HADOOP_CONF_DIR/core-site.xml
u $HADOOP_CONF_DIR/hdfs-site.xml
l 使用Spark Shell操做Hive
n 启动Spark Shell的时候,须要使用--jars指定mysql的驱动程序
n 建立表
spark.sql("create table src (key INT, value STRING) row format delimited fields terminated by ','")
n 导入数据
spark.sql("load data local inpath '/root/temp/data.txt' into table src")
n 查询数据
spark.sql("select * from src").show
l 使用spark-sql操做Hive
n 启动spark-sql的时候,须要使用--jars指定mysql的驱动程序
n 操做Hive
u spark.sql("show tables").show
u spark.sql("select * from emp1").show
性能调优主要是将数据放入内存中操做。经过spark.cacheTable("tableName")或者dataFrame.cache()。使用spark.uncacheTable("tableName")来从内存中去除table。
u Demo案例:
(*)从Oracle数据库中读取数据,生成DataFrame
val oracleDF = spark.read.format("jdbc")
.option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com")
.option("dbtable","scott.emp")
.option("user","scott")
.option("password","tiger").load
(*)将DataFrame注册成表: oracleDF.registerTempTable("emp")
(*)执行查询,并经过Web Console监控执行的时间
spark.sql("select * from emp").show
(*)将表进行缓存,并查询两次,并经过Web Console监控执行的时间
spark.sqlContext.cacheTable("emp")
(*)清空缓存:
spark.sqlContext.cacheTable("emp")
spark.sqlContext.clearCache
l 将数据缓存到内存中的相关优化参数
n spark.sql.inMemoryColumnarStorage.compressed
u 默认为 true
u Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式。
n spark.sql.inMemoryColumnarStorage.batchSize
u 默认值:10000
u 缓存批处理大小。缓存数据时, 较大的批处理大小能够提升内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险。
l 其余性能相关的配置选项(不过不推荐手动修改,可能在后续版本自动的自适应修改)
n spark.sql.files.maxPartitionBytes
u 默认值:128 MB
u 读取文件时单个分区可容纳的最大字节数
n spark.sql.files.openCostInBytes
u 默认值:4M
u 打开文件的估算成本, 按照同一时间可以扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。
l spark.sql.autoBroadcastJoinThreshold
n 默认值:10M
n 用于配置一个表在执行 join 操做时可以广播给全部 worker 节点的最大字节大小。经过将这个值设置为 -1 能够禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。
l spark.sql.shuffle.partitions
n 默认值:200
n 用于配置 join 或聚合操做混洗(shuffle)数据时使用的分区数。
Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据能够从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,而且可使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据能够被推送到文件系统,数据库和实时仪表板。并且,您还能够在数据流上应用Spark提供的机器学习和图处理算法。
在内部,它的工做原理以下。Spark Streaming接收实时输入数据流,并将数据切分红批,而后由Spark引擎对其进行处理,最后生成“批”形式的结果流。
Spark Streaming将连续的数据流抽象为discretizedstream或DStream。在内部,DStream 由一个RDD序列表示。
(1)因为在本案例中须要使用netcat网络工具,因此须要先安装。
rpm -iUv ~/netcat-0.6.1-1.i386.rpm
(2)启动netcat数据流服务器,并监听端口:1234
命令:nc -l -p 9999
服务器端:
(3)启动客户端
bin/run-example streaming.NetworkWordCount localhost 1234
客户端:
(必定注意):若是要执行本例,必须确保机器cpu核数大于2
(必定注意):
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
官方的解释:
l 初始化StreamingContext
n 方式一:从SparkConf对象中建立
n 从一个现有的SparkContext实例中建立
l 程序中的几点说明:
n appName参数是应用程序在集群UI上显示的名称。
n master是Spark,Mesos或YARN集群的URL,或者一个特殊的“local [*]”字符串来让程序以本地模式运行。
n 当在集群上运行程序时,不须要在程序中硬编码master参数,而是使用spark-submit提交应用程序并将master的URL以脚本参数的形式传入。可是,对于本地测试和单元测试,您能够经过“local[*]”来运行Spark Streaming程序(请确保本地系统中的cpu核心数够用)。
n StreamingContext会内在的建立一个SparkContext的实例(全部Spark功能的起始点),你能够经过ssc.sparkContext访问到这个实例。
n 批处理的时间窗口长度必须根据应用程序的延迟要求和可用的集群资源进行设置。
l 请务必记住如下几点:
n 一旦一个StreamingContextt开始运做,就不能设置或添加新的流计算。
n 一旦一个上下文被中止,它将没法从新启动。
n 同一时刻,一个JVM中只能有一个StreamingContext处于活动状态。
n StreamingContext上的stop()方法也会中止SparkContext。 要仅中止StreamingContext(保持SparkContext活跃),请将stop() 方法的可选参数stopSparkContext设置为false。
n 只要前一个StreamingContext在下一个StreamingContext被建立以前中止(不中止SparkContext),SparkContext就能够被重用来建立多个StreamingContext。
l DiscretizedStream或DStream 是Spark Streaming对流式数据的基本抽象。它表示连续的数据流,这些连续的数据流能够是从数据源接收的输入数据流,也能够是经过对输入数据流执行转换操做而生成的经处理的数据流。在内部,DStream由一系列连续的RDD表示,以下图:
l 举例分析:在以前的NetworkWordCount的例子中,咱们将一行行文本组成的流转换为单词流,具体作法为:将flatMap操做应用于名为lines的 DStream中的每一个RDD上,以生成words DStream的RDD。以下图所示:
可是DStream和RDD也有区别,下面画图说明:
最后两个transformation算子须要重点介绍一下:
n transform(func)
u 经过RDD-to-RDD函数做用于源DStream中的各个RDD,能够是任意的RDD操做,从而返回一个新的RDD
u 举例:在NetworkWordCount中,也可使用transform来生成元组对
n updateStateByKey(func)
u 操做容许不断用新信息更新它的同时保持任意状态。
l 定义状态-状态能够是任何的数据类型
l 定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态
l 注意:须要设置检查点
u 重写NetworkWordCount程序,累计每一个单词出现的频率(注意:累计)
u 输出结果:
n 注意:若是在IDEA中,不想输出log4j的日志信息,能够将log4j.properties文件(放在src的目录下)的第一行改成:
log4j.rootCategory=ERROR, console
Spark Streaming还提供了窗口计算功能,容许您在数据的滑动窗口上应用转换操做。下图说明了滑动窗口的工做方式:
如图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操做以产生windowed DStream的RDD。在上面的例子中,操做应用于最近3个时间单位的数据,并以2个时间单位滑动。这代表任何窗口操做都须要指定两个参数。
l 窗口长度(windowlength) - 窗口的时间长度(上图的示例中为:3)。
l 滑动间隔(slidinginterval) - 两次相邻的窗口操做的间隔(即每次滑动的时间长度)(上图示例中为:2)。
这两个参数必须是源DStream的批间隔的倍数(上图示例中为:1)。
咱们以一个例子来讲明窗口操做。 假设您但愿对以前的单词计数的示例进行扩展,每10秒钟对过去30秒的数据进行wordcount。为此,咱们必须在最近30秒的pairs DStream数据中对(word, 1)键值对应用reduceByKey操做。这是经过使用reduceByKeyAndWindow操做完成的。
须要注意:滑动的距离必须是采样时间的整数倍。
一些常见的窗口操做以下表所示。全部这些操做都用到了上述两个参数 - windowLength和slideInterval。
u window(windowLength, slideInterval)
l 基于源DStream产生的窗口化的批数据计算一个新的DStream
u countByWindow(windowLength, slideInterval)
l 返回流中元素的一个滑动窗口数
u reduceByWindow(func, windowLength, slideInterval)
l 返回一个单元素流。利用函数func汇集滑动时间间隔的流的元素建立这个单元素流。函数必须是相关联的以使计算可以正确的并行计算。
u reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
l 应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每个key的值均由给定的reduce函数汇集起来。注意:在默认状况下,这个算子利用了Spark默认的并发任务数去分组。你能够用numTasks参数设置不一样的任务数
u reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
l 上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce计算结果递增地计算每一个窗口的reduce值。这是经过对进入滑动窗口的新数据进行reduce操做,以及“逆减(inverse reducing)”离开窗口的旧数据来完成的。一个例子是当窗口滑动时对键对应的值进行“一加一减”操做。可是,它仅适用于“可逆减函数(invertible reduce functions)”,即具备相应“反减”功能的减函数(做为参数invFunc)。 像reduceByKeyAndWindow同样,经过可选参数能够配置reduce任务的数量。 请注意,使用此操做必须启用检查点。
u countByValueAndWindow(windowLength, slideInterval, [numTasks])
l 应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个key的值都是它们在滑动窗口中出现的频率。
输入DStreams表示从数据源获取输入数据流的DStreams。在NetworkWordCount例子中,lines表示输入DStream,它表明从netcat服务器获取的数据流。每个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。
输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源:
l 基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字链接、Akka的actor等
l 高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。
下面经过具体的案例,详细说明:
须要注意的是:
① 这些文件具备相同的格式
② 这些文件经过原子移动或重命名文件的方式在dataDirectory建立
③ 若是在文件中追加内容,这些追加的新数据也不会被读取。
注意:要演示成功,须要在原文件中编辑,而后拷贝一份。
使用streamingContext.queueStream(queueOfRDD)建立基于RDD队列的DStream,用于调试Spark Streaming应用程序。
输出操做容许DStream的操做推到如数据库、文件系统等外部系统中。由于输出操做其实是容许外部系统消费转换后的数据,它们触发的实际操做是DStream转换。目前,定义了下面几种输出操做:
l foreachRDD的设计模式
DStream.foreachRDD是一个强大的原语,发送数据到外部系统中。
出现如下Exception:
缘由是:Connection对象不是一个可被序列化的对象,不能RDD的每一个Worker上运行;即:Connection不能在RDD分布式环境中的每一个分区上运行,由于不一样的分区可能运行在不一样的Worker上。因此须要在每一个RDD分区上单首创建Connection对象。
咱们能够很方便地使用DataFrames和SQL操做来处理流数据。您必须使用当前的StreamingContext对应的SparkContext建立一个SparkSession。此外,必须这样作的另外一个缘由是使得应用能够在driver程序故障时得以从新启动,这是经过建立一个能够延迟实例化的单例SparkSession来实现的。
在下面的示例中,咱们使用DataFrames和SQL来修改以前的wordcount示例并对单词进行计数。咱们将每一个RDD转换为DataFrame,并注册为临时表,而后在这张表上执行SQL查询。
与RDD相似,DStreams还容许开发人员将流数据保留在内存中。也就是说,在DStream上调用persist() 方法会自动将该DStream的每一个RDD保留在内存中。若是DStream中的数据将被屡次计算(例如,相同数据上执行多个操做),这个操做就会颇有用。对于基于窗口的操做,如reduceByWindow和reduceByKeyAndWindow以及基于状态的操做,如updateStateByKey,数据会默认进行持久化。 所以,基于窗口的操做生成的DStream会自动保存在内存中,而不须要开发人员调用persist()。
对于经过网络接收数据(例如Kafka,Flume,sockets等)的输入流,默认持久化级别被设置为将数据复制到两个节点进行容错。
请注意,与RDD不一样,DStreams的默认持久化级别将数据序列化保存在内存中。
流数据处理程序一般都是全天候运行,所以必须对应用中逻辑无关的故障(例如,系统故障,JVM崩溃等)具备弹性。为了实现这一特性,Spark Streaming须要checkpoint足够的信息到容错存储系统,以即可以从故障中恢复。
① 通常会对两种类型的数据使用检查点:
1) 元数据检查点(Metadatacheckpointing) - 将定义流计算的信息保存到容错存储中(如HDFS)。这用于从运行streaming程序的driver程序的节点的故障中恢复。元数据包括如下几种:
l 配置(Configuration) - 用于建立streaming应用程序的配置信息。
l DStream操做(DStream operations) - 定义streaming应用程序的DStream操做集合。
l 不完整的batch(Incomplete batches) - jobs还在队列中但还没有完成的batch。
2) 数据检查点(Datacheckpointing) - 将生成的RDD保存到可靠的存储层。对于一些须要将多个批次之间的数据进行组合的stateful变换操做,设置数据检查点是必需的。在这些转换操做中,当前生成的RDD依赖于先前批次的RDD,这致使依赖链的长度随时间而不断增长,由此也会致使基于血统机制的恢复时间无限增长。为了不这种状况,stateful转换的中间RDD将按期设置检查点并保存到到可靠的存储层(例如HDFS)以切断依赖关系链。
总而言之,元数据检查点主要用于从driver程序故障中恢复,而数据或RDD检查点在任何使用stateful转换时是必需要有的。
② 什么时候启用检查点:
对于具备如下任一要求的应用程序,必须启用检查点:
1) 使用状态转:若是在应用程序中使用updateStateByKey或reduceByKeyAndWindow(具备逆函数),则必须提供检查点目录以容许按期保存RDD检查点。
2) 从运行应用程序的driver程序的故障中恢复:元数据检查点用于使用进度信息进行恢复。
③ 如何配置检查点:
能够经过在一些可容错、高可靠的文件系统(例如,HDFS,S3等)中设置保存检查点信息的目录来启用检查点。这是经过使用streamingContext.checkpoint(checkpointDirectory)完成的。设置检查点后,您就可使用上述的有状态转换操做。此外,若是要使应用程序从驱动程序故障中恢复,您应该重写streaming应用程序以使程序具备如下行为:
1) 当程序第一次启动时,它将建立一个新的StreamingContext,设置好全部流数据源,而后调用start()方法。
2) 当程序在失败后从新启动时,它将从checkpoint目录中的检查点数据从新建立一个StreamingContext。
使用StreamingContext.getOrCreate能够简化此行为
④ 改写以前的WordCount程序,使得每次计算的结果和状态都保存到检查点目录下
经过查看HDFS中的信息,能够看到相关的检查点信息,以下:
须要注意:若是使用Scala IDE(eclipse)开发程序集成Flume的话,直接加入Flume的jar包,会存在如下两个问题
(1)问题1:
存在两个scala-library(scala-library-2.11.8.jar和scala-library-2.10.5.jar),删除第二个。
(2)问题2:
spark-streaming-flume_2.10-2.1.0.jar of SparkProject build path is cross-compiled with an incompatible version of Scala (2.10.0). In case this report is mistaken, this check can be disabled in the compiler preference page.SparkProject Unknown Scala Version Problem
l 基于Flume的Push模式
Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming能够很方便的创建一个receiver,起到一个Avro agent的做用.Flume能够将数据推送到改receiver.
n 启动Spark Streaming程序
n 启动Flume
n 拷贝日志文件到/root/training/logs目录
n 观察输出,采集到数据
l 基于Custom Sink的Pull模式
不一样于Flume直接将数据推送到Spark Streaming中,第二种模式经过如下条件运行一个正常的Flume sink。Flume将数据推送到sink中,而且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从sink拉取数据。只要当数据被接收而且被Spark Streaming备份后,转换器才运行成功。
这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式须要为Flume配置一个正常的sink。
如下为配置步骤:
n 将Spark的jar包拷贝到Flume的lib目录下
n 下面的这个jar包也须要拷贝到Flume的lib目录下,同时加入IDEA工程的classpath
n 启动Flume
n 在IDEA中启动FlumeLogPull
n 将测试数据拷贝到/root/training/logs
n 观察IDEA中的输出
Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。
搭建ZooKeeper(Standalone):
(*)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件
dataDir=/root/training/zookeeper-3.4.10/tmp
server.1=spark81:2888:3888
(*)在/root/training/zookeeper-3.4.10/tmp目录下建立一个myid的空文件
echo 1 > /root/training/zookeeper-3.4.6/tmp/myid
搭建Kafka环境(单机单broker):
(*)修改server.properties文件
(*)启动Kafka
bin/kafka-server-start.sh config/server.properties &
出现如下错误:
须要修改bin/kafka-run-class.sh文件,将这个选项注释掉。
(*)测试Kafka
l 建立Topic
bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1
l 发送消息
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
l 接收消息
bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1
l 搭建Spark Streaming和Kafka的集成开发环境
因为Spark Streaming和Kafka集成的时候,依赖的jar包比较多,并且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程。
下面是依赖的pom.xml文件:
l 基于Receiver的方式
这个方法使用了Receivers来接收数据。Receivers的实现使用到Kafka高层次的消费者API。对于全部的Receivers,接收到的数据将会保存在Spark executors中,而后由Spark Streaming启动的Job来处理这些数据。
n 启动Kafka消息的生产者
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
n 在IDEA中启动任务,接收Kafka消息
l 直接读取方式
和基于Receiver接收数据不同,这种方式按期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每一个batch里面处理数据。看成业须要处理的数据来临时,spark经过调用Kafka的简单消费者API读取必定范围的数据。
n 启动Kafka消息的生产者
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
n 在IDEA中启动任务,接收Kafka消息
在Spark中有几个优化能够减小批处理的时间:
① 数据接收的并行水平
经过网络(如kafka,flume,socket等)接收数据须要这些数据反序列化并被保存到Spark中。若是数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每一个输入DStream建立一个receiver(运行在worker机器上)接收单个数据流。建立多个输入DStream并配置它们能够从源中接收不一样分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream能够被切分为两个kafka输入流,每一个接收一个topic。这将在两个worker上运行两个receiver,所以容许数据并行接收,提升总体的吞吐量。多个DStream能够被合并生成单个DStream,这样运用在单个输入DStream的transformation操做能够运用在合并的DStream上。
② 数据处理的并行水平
若是运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数经过配置属性来肯定spark.default.parallelism。
③ 数据序列化
能够经过改变序列化格式来减小数据序列化的开销。在流式传输的状况下,有两种类型的数据会被序列化:
l 输入数据
l 由流操做生成的持久RDD
在上述两种状况下,使用Kryo序列化格式能够减小CPU和内存开销。
为了Spark Streaming应用程序可以在集群中稳定运行,系统应该可以以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这能够经过流的网络UI观察获得。批处理时间应该小于批间隔时间。
根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率能够经过应用程序维持。能够考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能能够每2秒打印一次单词计数(批间隔时间为2秒),但没法每500毫秒打印一次单词计数。因此,为了在生产环境中维持指望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。
找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。
在这一节,咱们重点介绍几个强烈推荐的自定义选项,它们能够减小Spark Streaming应用程序垃圾回收的相关暂停,得到更稳定的批处理时间。
l Default persistence level of DStreams:和RDDs不一样的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即便保存数据为序列化形态会增长序列化/反序列化的开销,可是能够明显的减小垃圾回收的暂停。
l Clearing persistent RDDs:默认状况下,经过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。若是spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值须要根据Spark Streaming应用程序的操做当心设置。然而,能够设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个配置使系统找出那些不须要常常保有的RDD,而后去持久化它们。这能够减小Spark RDD的内存使用,也可能改善垃圾回收的行为。
l Concurrent garbage collector:使用并发的标记-清除垃圾回收能够进一步减小垃圾回收的暂停时间。尽管并发的垃圾回收会减小系统的总体吞吐量,可是仍然推荐使用它以得到更稳定的批处理时间。