1、Spark Streaming概述:
是基于Spark core的API,不须要单独安装,一盏式解决
可扩展、高吞吐量、容错性、可以运行在多节点、结合了批处理、机器学习、图计算等
将不一样的数据源的数据通过Spark Streaming处理后输出到外部文件系统
1. 应用场景:
实时交易防欺诈检测、传感器异常实时反应
整理Spark发展史问题(缺乏)
2. Spark Streaming工做原理:
粗粒度:
把实时数据流,以秒数拆分红批次的小数据块,经过Spark
当成RDD来处理
细粒度:
3. 核心概念:
编程入口:
StreamingContext
经常使用构造方法源码:
def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
batchDuration 是必须填的,根据应用程序的延迟需求和资源可用状况来设置
定义好streamingContext后,再定义DStream、transformation等,经过start()开始,stop()结束
注意:
一个context启动后,不能再运行新的streaming(一个JVM只能有一个streamingContext)
一旦中止后,就没办法再从新开始
Stop方法默认把sparkContext和streamingContext同时关掉,要不想关掉sc,必须定义stopSparkContext参数为false
一个SparkContext可以建立多个StreamingContext
最基础的抽象:Discretized Stream (DStream)
一系列的RDD表明一个DStream,是不可变的、分布式的dataset
每个RDD表明一个时间段(批次)的数据
对DStream进行操做算子(flatMap)时,在底层上看就是对每个RDD作相同的操做,交由Spark core运行
数据输入:Input DStreams and Receivers
每个Input DStream 关联着一个Receiver(但从文件系统接收不须要receiver),receiver 接收数据并存在内存中
receiver须要占用一个线程,因此不能定义local[1],线程的数量n必须大于receivers的数量
转换:Transformations on DStreams
与RDD相似:map、flatMap、filter、repartition、count...
数据输出:Output Operations and DStreams:
输出到数据库或者文件系统:
API:print、save、foreach
2、Spark Streaming实战部分:
- Spark Streaming处理socket数据:
接收到的数据进行WordCount操做:
在IDEA中:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* Spark Streaming 处理Socket数据
* */
object NetWorkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetWorkWordCount")
//建立streamingContext的两个参数sparkConf和seconds
val ssc = new StreamingContext(sparkConf, Seconds(5))
//生成Input DStream
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
在控制台中:
nc -lk 6789,建立一个Socket
在这上面输入数据,就能够在IDEA中count出来了
注意:
在执行过程当中会报错,必须在Maven projects中找出报错提示中所缺乏的包,而且在dependency上加入。
- Spark Streaming处理HDFS中的数据:
ssc.textFileStream("file_path")
一样是像上面同样,只是改了stream的source
可是测试时,必需要是生成新的文件(官网称为moving进去的文件),才会被统计;而往旧的文件里再添加数据,也不会被统计了
- Spark Streaming进阶实战:
带状态的算子UpdateStateByKey、保存到MySQL、window函数
UpdateStateByKey实现实时更新:
容许把新旧状态结合,连续地更新
准备工做:
- 定义一个状态
- 定义状态更新的方法
注意:
- updateFunction须要隐式转换
- 报错:Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set.
意思就是要进行
checkpoint记录
实现代码:
把reduceByKey删除,而且把map以后的RDD定义为一个state,配合这个state写状态更新方法
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* Spark Streaming有状态的统计
* */
object StatefulWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//使用状态算子必需要设置checkpoint
//通常要保存记录在HDFS中
ssc.checkpoint(".")
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ")).map((_, 1)) //不能用reduceByKey
//连续更新状态
val state = result.updateStateByKey(updateFunction _) //须要隐式转换
state.print()
ssc.start()
ssc.awaitTermination()
}
/*
* 状态更新方法更新已有的数据,放在updateStateByKey中
* */
def updateFunction(currData: Seq[Int], prevData: Option[Int]): Option[Int] = {
val curr = currData.sum //算出当前的总次数
val prev = prevData.getOrElse(0) //读取已有的
//返回已有和当前的和
Some(curr + prev)
}
}
- 统计结果写到MySQL中:
前提准备:
须要在IDEA中增长mysql的connector依赖
在mysql数据库中先建立一张表
写jdbc建立链接到Mysql
使用foreachRDD,有不少种
错误的写法:(没有序列化,建立太多mysql链接等)
报错没有序列化:
dstream.foreachRDD {rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach {record =>connection.send(record) // executed at the worker
}
}
花太多开销在链接和断开数据库上
dstream.foreachRDD {rdd =>
rdd.foreach {record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
官方正确写法:
使用foreachPartition进行优化链接:
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
val connection = createNewConnection() //建立mysql链接
partitionOfRecords.foreach(record =>
connection.send(record))
connection.close()
}
}
用链接池进行进一步优化:
Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record =>connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
在写入MySQL数据时,应该做一个是否存在的判断:
若存在则使用update语句,不存在则使用insert语句
- Window的使用:
两个参数:
window length:窗口长度
sliding interval:窗口间隔
也就是每隔sliding interval统计前window length的值
API:countByWindow、reduceByWindow…
- 实战:黑名单过滤
transform算子的使用+Spark Streaming整合RDD操做
元组默认从1开始数
假设输入数据为id, name 这种形式
实现过程:
- 创建黑名单元组 => (name, true)
- 把输入数据流编程元组 => (name, (id, name))
- transform,把每一个DStream变成一个个RDD操做
- 数据流的RDD与黑名单RDD进行leftjoin,得到新的元组
- filter判断过滤
- 整合输出
实现代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* 黑名单过滤demo
* */
object TransformApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//构建黑名单列表, 实际应用中可在外面读取列表, 并转成RDD, 用true标记为是黑名单元组(name, true)
val blacks = List("zs", "ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
//获取每行
val lines = ssc.socketTextStream("localhost", 6789)
//把id, name => 元组(name, (id, name))
//transform 的使用,对stream的每一个RDD操做
val filterResult = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
//与黑名单进行leftjoin => (name, ((id, name), true)), 并过滤出是true的项
rdd.leftOuterJoin(blacksRDD)
.filter(x => x._2._2.getOrElse(false) != true) //过滤出不等于true的
.map(x => x._2._1)
})
filterResult.print()
ssc.start()
ssc.awaitTermination()
}
}
- Spark Streaming整合Spark SQL
整合完成词频统计操做
官网代码:
就是foreachRDD把streaming转成RDD,而后toDF就能够进行DataFrame或者是sql的操做了