Spark学习之Spark Streaming

Spark Streaming实时数据流处理

1、Spark Streaming基础

1Spark Streaming简介
http://spark.apache.org/docs/latest/streaming-programming-guide.html

Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据能够从诸如KafkaFlumeKinesisTCP套接字等众多来源获取,而且可使用由高级函数(如mapreducejoinwindow)开发的复杂算法进行流数据处理。最后,处理后的数据能够被推送到文件系统,数据库和实时仪表板。并且,您还能够在数据流上应用Spark提供的机器学习和图处理算法。
html

2Spark Streaming的特色

便于使用算法

 

经过高级操做员构建应用程序。sql

 

Spark Streaming将Apache Spark的 语言集成API 引入流处理,使您能够像编写批处理做业同样编写流式做业。它支持Java,Scala和Python。数据库

容错apache

开箱即用的有状态的一次性语义。设计模式

Spark Streaming能够开箱即用,恢复丢失的工做和操做员状态(例如滑动窗口),而无需任何额外的代码。

缓存

Spark集成性能优化

将流式传输与批量和交互式查询相结合。服务器

经过在Spark上运行,Spark Streaming容许您重复使用相同的代码进行批处理,将流加入历史数据,或者在流状态下运行即席查询。构建强大的交互式应用程序,而不仅是分析。网络


3Spark Streaming的内部结构

在内部,它的工做原理以下。Spark Streaming接收实时输入数据流,并将数据切分红批,而后由Spark引擎对其进行处理,最后生成“批”形式的结果流。

Spark Streaming将连续的数据流抽象为discretizedstreamDStream。在内部,DStream 由一个RDD序列表示。

4、第一个小案例:NetworkWordCount

1)因为在本案例中须要使用netcat网络工具,因此须要先安装。 

  rpm -iUv ~/netcat-0.6.1-1.i386.rpm

 

2)启动netcat数据流服务器,并监听端口:1234

  命令:nc -l -p 9999

  服务器端:

3)启动客户端

bin/run-example streaming.NetworkWordCount localhost 1234

客户端:
 

(必定注意):若是要执行本例,必须确保机器cpu核数大于 2

5、开发本身的NetworkWordCount
 

(必定注意):

val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")

 

官方的解释:

2、Spark Streaming进阶

1StreamingContext对象详解

  初始化StreamingContext

  方式一:从SparkConf对象中建立
 

  从一个现有的SparkContext实例中建立
 

 程序中的几点说明:

  appName参数是应用程序在集群UI上显示的名称。

  masterSparkMesosYARN集群的URL,或者一个特殊的“local [*]”字符串来让程序以本地模式运行。

  当在集群上运行程序时,不须要在程序中硬编码master参数,而是使用spark-submit提交应用程序并将masterURL以脚本参数的形式传入。可是,对于本地测试和单元测试,您能够经过“local[*]”来运行Spark Streaming程序(请确保本地系统中的cpu核心数够用)。

  StreamingContext会内在的建立一个SparkContext的实例(全部Spark功能的起始点),你能够经过ssc.sparkContext访问到这个实例。

  批处理的时间窗口长度必须根据应用程序的延迟要求和可用的集群资源进行设置。

 

 请务必记住如下几点:

  一旦一个StreamingContextt开始运做,就不能设置或添加新的流计算。

  一旦一个上下文被中止,它将没法从新启动。

  同一时刻,一个JVM中只能有一个StreamingContext处于活动状态。

  StreamingContext上的stop()方法也会中止SparkContext。 要仅中止StreamingContext(保持SparkContext活跃),请将stop() 方法的可选参数stopSparkContext设置为false

  只要前一个StreamingContext在下一个StreamingContext被建立以前中止(不中止SparkContext),SparkContext就能够被重用来建立多个StreamingContext

2、离散流(DStreams):Discretized Streams

l DiscretizedStreamDStream Spark Streaming对流式数据的基本抽象。它表示连续的数据流,这些连续的数据流能够是从数据源接收的输入数据流,也能够是经过对输入数据流执行转换操做而生成的经处理的数据流。在内部,DStream由一系列连续的RDD表示,以下图:

举例分析:在以前的NetworkWordCount的例子中,咱们将一行行文本组成的流转换为单词流,具体作法为:将flatMap操做应用于名为linesDStream中的每一个RDD上,以生成words DStreamRDD。以下图所示:

可是DStream和RDD也有区别,下面画图说明:

 

3DStream中的转换操做(transformation
 

最后两个transformation算子须要重点介绍一下: 

1.transform(func)

  经过RDD-to-RDD函数做用于源DStream中的各个RDD,能够是任意的RDD操做,从而返回一个新的RDD

  举例:在NetworkWordCount中,也可使用transform来生成元组对

2.updateStateByKey(func)

  操做容许不断用新信息更新它的同时保持任意状态。

    定义状态-状态能够是任何的数据类型

    定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态

  重写NetworkWordCount程序,累计每一个单词出现的频率(注意:累计)

package demo

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object MyTotalNetworkWordCount {
  def main(args: Array[String]): Unit = {
    //建立一个Context对象: StreamingContext  (SparkContext, SQLContext)
    //指定批处理的时间间隔
    val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(5))

    //设置检查点
    ssc.checkpoint("hdfs://192.168.157.11:9000/spark/checkpoint")

    //建立一个DStream,处理数据
    val lines  = ssc.socketTextStream("192.168.157.81",7788,StorageLevel.MEMORY_AND_DISK_SER)

    //执行wordcount
    val words = lines.flatMap(_.split(" "))

    //定义函数用于累计每一个单词的总频率
    val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
      //经过Spark内部的reduceByKey按key规约,而后这里传入某key当前批次的Seq/List,再计算当前批次的总和
      val currentCount = currValues.sum
      // 已累加的值
      val previousCount = prevValueState.getOrElse(0)
      // 返回累加后的结果,是一个Option[Int]类型
      Some(currentCount + previousCount)
    }

    val pairs = words.map(word => (word, 1))

    val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
    totalWordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
View Code 

  输出结果:

注意:若是在IDEA中,不想输出log4j的日志信息,能够将log4j.properties文件(放在src的目录下)的第一行改成:

log4j.rootCategory=ERROR, console

4、窗口操做

Spark Streaming还提供了窗口计算功能,容许您在数据的滑动窗口上应用转换操做。下图说明了滑动窗口的工做方式:

如图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操做以产生windowed DStreamRDD。在上面的例子中,操做应用于最近3个时间单位的数据,并以2个时间单位滑动。这代表任何窗口操做都须要指定两个参数。

  窗口长度(windowlength- 窗口的时间长度(上图的示例中为:3)。

  滑动间隔(slidinginterval- 两次相邻的窗口操做的间隔(即每次滑动的时间长度)(上图示例中为:2)。

这两个参数必须是源DStream的批间隔的倍数(上图示例中为:1)。

 

咱们以一个例子来讲明窗口操做。 假设您但愿对以前的单词计数的示例进行扩展,每10秒钟对过去30秒的数据进行wordcount。为此,咱们必须在最近30秒的pairs DStream数据中对(word, 1)键值对应用reduceByKey操做。这是经过使用reduceByKeyAndWindow操做完成的。

 

 

 

 

一些常见的窗口操做以下表所示。全部这些操做都用到了上述两个参数 - windowLengthslideInterval

  window(windowLength, slideInterval)

    基于源DStream产生的窗口化的批数据计算一个新的DStream

 

  countByWindow(windowLength, slideInterval)

    返回流中元素的一个滑动窗口数

 

  reduceByWindow(func, windowLength, slideInterval)

    返回一个单元素流。利用函数func汇集滑动时间间隔的流的元素建立这个单元素流。函数必须是相关联的以使计算可以正确的并行计算。

 

 

  reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

    应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每个key的值均由给定的reduce函数汇集起来。注意:在默认状况下,这个算子利用了Spark默认的并发任务数去分组。你能够用numTasks参数设置不一样的任务数

 

  reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

    上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce计算结果递增地计算每一个窗口的reduce值。这是经过对进入滑动窗口的新数据进行reduce操做,以及“逆减(inverse reducing)”离开窗口的旧数据来完成的。一个例子是当窗口滑动时对键对应的值进行“一加一减”操做。可是,它仅适用于“可逆减函数(invertible reduce functions)”,即具备相应“反减”功能的减函数(做为参数invFunc)。 像reduceByKeyAndWindow同样,经过可选参数能够配置reduce任务的数量。 请注意,使用此操做必须启用检查点。

 

  countByValueAndWindow(windowLength, slideInterval, [numTasks])

    应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个key的值都是它们在滑动窗口中出现的频率。

5、输入DStreams和接收器

输入DStreams表示从数据源获取输入数据流的DStreams。在NetworkWordCount例子中,lines表示输入DStream,它表明从netcat服务器获取的数据流。每个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。

输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源:

  基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字链接、Akkaactor

  高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。

 

下面经过具体的案例,详细说明:

 

  • 文件流:经过监控文件系统的变化,如有新文件添加,则将它读入并做为数据流

须要注意的是:

① 这些文件具备相同的格式

② 这些文件经过原子移动或重命名文件的方式在dataDirectory建立

③ 若是在文件中追加内容,这些追加的新数据也不会被读取。

注意:要演示成功,须要在原文件中编辑,而后拷贝一份。

  RDD队列流

使用streamingContext.queueStream(queueOfRDD)建立基于RDD队列的DStream,用于调试Spark Streaming应用程序。

package demo

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object RDDQueueStream {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("queueStream")
    //每1秒对数据进行处理
    val ssc = new StreamingContext(conf,Seconds(1))

    //建立一个可以push到QueueInputDStream的RDDs队列
    val rddQueue = new mutable.Queue[RDD[Int]]()

    //基于一个RDD队列建立一个输入源
    val inputStream = ssc.queueStream(rddQueue)

    //将接收到的数据乘以10
    val mappedStream = inputStream.map(x => (x,x*10))
    mappedStream.print()

    ssc.start()

    for(i <- 1 to 3){
      rddQueue += ssc.sparkContext.makeRDD(1 to 10)   //建立RDD,并分配两个核数
      Thread.sleep(1000)
    }
    ssc.stop()
  }
}
View Code

 

  套接字流:经过监听Socket端口来接收数据

package demo
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ScoketStreaming {

  def main(args: Array[String]) {
    //建立一个本地的StreamingContext,含2个工做线程
    val conf = new SparkConf().setMaster("local[4]").setAppName("ScoketStreaming")
    val sc = new StreamingContext(conf,Seconds(5))   //每隔10秒统计一次字符总数

    //建立珍一个DStream,链接127.0.0.1 :7788
    val lines = sc.socketTextStream("127.0.0.1",7788)
    //打印数据
    lines.print()

    sc.start()         //开始计算
    sc.awaitTermination()   //经过手动终止计算,不然一直运行下去
  }
}
View Code  

6DStreams的输出操做

输出操做容许DStream的操做推到如数据库、文件系统等外部系统中。由于输出操做其实是容许外部系统消费转换后的数据,它们触发的实际操做是DStream转换。目前,定义了下面几种输出操做:
  

foreachRDD的设计模式

DStream.foreachRDD是一个强大的原语,发送数据到外部系统中。

  第一步:建立链接,将数据写入外部数据库(使用以前的NetworkWordCount,改写以前输出结果的部分,以下)

  •  

    出现如下Exception

    缘由是:Connection对象不是一个可被序列化的对象,不能RDD的每一个Worker上运行;即:Connection不能在RDD分布式环境中的每一个分区上运行,由于不一样的分区可能运行在不一样的Worker上。因此须要在每一个RDD分区上单首创建Connection对象。

   第二步:在每一个RDD分区上单首创建Connection对象,以下:

View Code

 

7DataFrameSQL操做

咱们能够很方便地使用DataFramesSQL操做来处理流数据。您必须使用当前的StreamingContext对应的SparkContext建立一个SparkSession。此外,必须这样作的另外一个缘由是使得应用能够在driver程序故障时得以从新启动,这是经过建立一个能够延迟实例化的单例SparkSession来实现的。

在下面的示例中,咱们使用DataFramesSQL来修改以前的wordcount示例并对单词进行计数。咱们将每一个RDD转换为DataFrame,并注册为临时表,而后在这张表上执行SQL查询。

package demo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object MyNetworkWordCountDataFrame {
  def main(args: Array[String]): Unit = {
    //建立一个Context对象: StreamingContext  (SparkContext, SQLContext)
    //指定批处理的时间间隔
    val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))

    //建立一个DStream,处理数据
    val lines = ssc.socketTextStream("192.168.157.81", 7788, StorageLevel.MEMORY_AND_DISK_SER)

    //执行wordcount
    val words = lines.flatMap(_.split(" "))

    //使用Spark SQL来查询Spark Streaming处理的数据
    words.foreachRDD { rdd =>
      //使用单列模式,建立SparkSession对象
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

      import spark.implicits._
      // 将RDD[String]转换为DataFrame
      val wordsDataFrame = rdd.toDF("word")

      // 建立临时视图
      wordsDataFrame.createOrReplaceTempView("words")

      // 执行SQL
      val wordCountsDataFrame =   spark.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()
    }

    //启动StreamingContext
    ssc.start()

    //等待计算完成
    ssc.awaitTermination()
  }
}
View Code 

8、缓存/持久化

RDD相似,DStreams还容许开发人员将流数据保留在内存中。也就是说,在DStream上调用persist() 方法会自动将该DStream的每一个RDD保留在内存中。若是DStream中的数据将被屡次计算(例如,相同数据上执行多个操做),这个操做就会颇有用。对于基于窗口的操做,如reduceByWindowreduceByKeyAndWindow以及基于状态的操做,如updateStateByKey,数据会默认进行持久化。 所以,基于窗口的操做生成的DStream会自动保存在内存中,而不须要开发人员调用persist()

 

对于经过网络接收数据(例如KafkaFlumesockets等)的输入流,默认持久化级别被设置为将数据复制到两个节点进行容错。

请注意,与RDD不一样,DStreams的默认持久化级别将数据序列化保存在内存中。

9、检查点支持

流数据处理程序一般都是全天候运行,所以必须对应用中逻辑无关的故障(例如,系统故障,JVM崩溃等)具备弹性。为了实现这一特性,Spark Streaming须要checkpoint足够的信息到容错存储系统,以即可以从故障中恢复。

 

① 通常会对两种类型的数据使用检查点:

1) 元数据检查点(Metadatacheckpointing - 将定义流计算的信息保存到容错存储中(如HDFS)。这用于从运行streaming程序的driver程序的节点的故障中恢复。元数据包括如下几种:

配置(Configuration- 用于建立streaming应用程序的配置信息。

l DStream操做(DStream operations- 定义streaming应用程序的DStream操做集合。

不完整的batchIncomplete batches- jobs还在队列中但还没有完成的batch

 

2) 数据检查点(Datacheckpointing- 将生成的RDD保存到可靠的存储层。对于一些须要将多个批次之间的数据进行组合的stateful变换操做,设置数据检查点是必需的。在这些转换操做中,当前生成的RDD依赖于先前批次的RDD,这致使依赖链的长度随时间而不断增长,由此也会致使基于血统机制的恢复时间无限增长。为了不这种状况,stateful转换的中间RDD将按期设置检查点并保存到到可靠的存储层(例如HDFS)以切断依赖关系链。

 

总而言之,元数据检查点主要用于从driver程序故障中恢复,而数据或RDD检查点在任何使用stateful转换时是必需要有的。

 

② 什么时候启用检查点:

对于具备如下任一要求的应用程序,必须启用检查点:

1) 使用状态转:若是在应用程序中使用updateStateByKeyreduceByKeyAndWindow(具备逆函数),则必须提供检查点目录以容许按期保存RDD检查点。

2) 从运行应用程序的driver程序的故障中恢复:元数据检查点用于使用进度信息进行恢复。

 

③ 如何配置检查点:

能够经过在一些可容错、高可靠的文件系统(例如,HDFSS3等)中设置保存检查点信息的目录来启用检查点。这是经过使用streamingContext.checkpoint(checkpointDirectory)完成的。设置检查点后,您就可使用上述的有状态转换操做。此外,若是要使应用程序从驱动程序故障中恢复,您应该重写streaming应用程序以使程序具备如下行为:

1) 当程序第一次启动时,它将建立一个新的StreamingContext,设置好全部流数据源,而后调用start()方法。

2) 当程序在失败后从新启动时,它将从checkpoint目录中的检查点数据从新建立一个StreamingContext

使用StreamingContext.getOrCreate能够简化此行为

④ 改写以前的WordCount程序,使得每次计算的结果和状态都保存到检查点目录下

package demo

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object MyCheckpointNetworkWordCount {
  def main(args: Array[String]): Unit = {
    //在主程序中,建立一个Streaming Context对象
    //一、读取一个检查点的目录
    //二、若是该目录下已经存有以前的检查点信息,从已有的信息上建立这个Streaming Context对象
    //三、若是该目录下没有信息,建立一个新的Streaming Context
    val context = StreamingContext.getOrCreate("hdfs://192.168.157.111:9000/spark_checkpoint",createStreamingContext)

    //启动任务
    context.start()
    context.awaitTermination()
  }

  //建立一个StreamingContext对象,而且设置检查点目录,执行WordCount程序(记录以前的状态信息)
  def createStreamingContext():StreamingContext = {
    val conf = new SparkConf().setAppName("MyCheckpointNetworkWordCount").setMaster("local[2]")
    //建立这个StreamingContext对象
    val ssc = new StreamingContext(conf,Seconds(3))

    //设置检查点目录
    ssc.checkpoint("hdfs://192.168.157.111:9000/spark_checkpoint")

    //建立一个DStream,执行WordCount
    val lines = ssc.socketTextStream("192.168.157.81",7788,StorageLevel.MEMORY_AND_DISK_SER)

    //分词操做
    val words = lines.flatMap(_.split(" "))
    //每一个单词记一次数
    val wordPair = words.map(x=> (x,1))

    //执行单词计数
    //定义一个新的函数:把当前的值跟以前的结果进行一个累加
    val addFunc = (currValues:Seq[Int],preValueState:Option[Int]) => {
      //当前当前批次的值
      val currentCount = currValues.sum

      //获得已经累加的值。若是是第一次求和,以前没有数值,从0开始计数
      val preValueCount = preValueState.getOrElse(0)

      //进行累加,而后累加后结果,是Option[Int]
      Some(currentCount + preValueCount)
    }

    //要把新的单词个数跟以前的结果进行叠加(累计)
    val totalCount = wordPair.updateStateByKey[Int](addFunc)

    //输出结果
    totalCount.print()

    //返回这个对象
    ssc
  }
}
View Code  

经过查看HDFS中的信息,能够看到相关的检查点信息,以下:

 

3、高级数据源

1Spark Streaming接收Flume数据

  基于FlumePush模式

  Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming能够很方便的创建一个receiver,起到一个Avro agent的做用.Flume能够将数据推送到改receiver.

  • 第一步:Flume的配置文件
    #bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
    #定义agent名, source、channel、sink的名称
    a4.sources = r1
    a4.channels = c1
    a4.sinks = k1
    
    #具体定义source
    a4.sources.r1.type = spooldir
    a4.sources.r1.spoolDir = /root/training/logs
    
    #具体定义channel
    a4.channels.c1.type = memory
    a4.channels.c1.capacity = 10000
    a4.channels.c1.transactionCapacity = 100
    
    #具体定义sink
    a4.sinks = k1
    a4.sinks.k1.type = avro
    a4.sinks.k1.channel = c1
    a4.sinks.k1.hostname = 192.168.157.1
    a4.sinks.k1.port = 1234
    
    #组装source、channel、sink
    a4.sources.r1.channels = c1
    a4.sinks.k1.channel = c1
    View Code 
  • 第二步:Spark Streaming程序
    package demo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object MyFlumeStream {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))
    
        //建立FlumeEvent的DStream
        val flumeEvent = FlumeUtils.createStream(ssc,"192.168.157.1",1234)
    
        //将FlumeEvent中的事件转成字符串
        val lineDStream = flumeEvent.map( e => {
           new String(e.event.getBody.array)
        })
    
        //输出结果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination();
      }
    }
    View Code 
  • 第三步:注意除了须要使用Flumelibjar包之外,还须要如下jar包:
    spark-streaming-flume_2.10-2.1.0.jar
     
  • 第四步:测试

    启动Spark Streaming程序

    启动Flume

    拷贝日志文件到/root/training/logs目录

    观察输出,采集到数据
    

基于Custom SinkPull模式

  不一样于Flume直接将数据推送到Spark Streaming中,第二种模式经过如下条件运行一个正常的Flume sinkFlume将数据推送到sink中,而且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从sink拉取数据。只要当数据被接收而且被Spark Streaming备份后,转换器才运行成功。

  这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式须要为Flume配置一个正常的sink

  如下为配置步骤:

  • 第一步:Flume的配置文件
    #bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console
    a1.channels = c1
    a1.sinks = k1
    a1.sources = r1
    
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/training/logs
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 100000
    a1.channels.c1.transactionCapacity = 100000
    
    a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = 192.168.157.81
    a1.sinks.k1.port = 1234
    
    #组装source、channel、sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    View Code 
  • 第二步:Spark Streaming程序
    package demo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object FlumeLogPull {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //建立FlumeEvent的DStream
        val flumeEvent = FlumeUtils.createPollingStream(ssc,"192.168.157.81",1234,StorageLevel.MEMORY_ONLY_SER_2)
    
        //将FlumeEvent中的事件转成字符串
        val lineDStream = flumeEvent.map( e => {
          new String(e.event.getBody.array)
        })
    
        //输出结果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination();
      }
    }
    View Code 
  • 第三步:须要的jar

    spark-streaming-flume-sink_2.10-2.1.0.jar   

    将Sparkjar包拷贝到Flumelib目录下

    下面的这个jar包也须要拷贝到Flumelib目录下,同时加入IDEA工程的classpath 

  • 第四步:测试

    启动Flume

    在IDEA中启动FlumeLogPull

    将测试数据拷贝到/root/training/logs

    观察IDEA中的输出

   

2Spark Streaming接收Kafka数据

Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。

搭建ZooKeeperStandalone):

*)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件

dataDir=/root/training/zookeeper-3.4.10/tmp

server.1=spark81:2888:3888

*)在/root/training/zookeeper-3.4.10/tmp目录下建立一个myid的空文件

echo 1 > /root/training/zookeeper-3.4.6/tmp/myid

搭建Kafka环境(单机单broker):

*)修改server.properties文件
 

*)启动Kafka

bin/kafka-server-start.sh config/server.properties &

 出现如下错误:

     须要修改bin/kafka-run-class.sh文件,将这个选项注释掉。

*)测试Kafka

  建立Topic

bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1

 

  发送消息

bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1

 

  接收消息

bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1

 

搭建Spark StreamingKafka的集成开发环境

因为Spark StreamingKafka集成的时候,依赖的jar包比较多,并且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程

下面是依赖的pom.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ZDemo5</groupId>
    <artifactId>ZDemo5</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>

</project>
View Code

基于Receiver的方式

这个方法使用了Receivers来接收数据。Receivers的实现使用到Kafka高层次的消费者API。对于全部的Receivers,接收到的数据将会保存在Spark executors中,而后由Spark Streaming启动的Job来处理这些数据。
 

  • 开发Spark StreamingKafka Receivers
    package demo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object KafkaWordCount {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //建立topic名称,1表示一次从这个topic中获取一条记录
        val topics = Map("mydemo1" ->1)
    
        //建立Kafka的输入流,指定ZooKeeper的地址
        val kafkaStream = KafkaUtils.createStream(ssc,"192.168.157.81:2181","mygroup",topics)
    
        //处理每次接收到的数据
        val lineDStream = kafkaStream.map(e => {
          new String(e.toString())
        })
        //输出结果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination();
      }
    }
    View Code
  • 测试

  启动Kafka消息的生产者

    bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1 

  在IDEA中启动任务,接收Kafka消息

直接读取方式

和基于Receiver接收数据不同,这种方式按期地从Kafkatopic+partition中查询最新的偏移量,再根据定义的偏移量范围在每一个batch里面处理数据。看成业须要处理的数据来临时,spark经过调用Kafka的简单消费者API读取必定范围的数据。

  • 开发Spark Streaming的程序
    package demo
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    object DirectKafkaWordCount {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //建立topic名称,1表示一次从这个topic中获取一条记录
        val topics = Set("mydemo1")
        //指定Kafka的broker地址
        val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.157.81:9092")
    
        //建立DStream,接收Kafka的数据
        val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    
        //处理每次接收到的数据
        val lineDStream = kafkaStream.map(e => {
          new String(e.toString())
        })
        //输出结果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination();
      }
    }
    View Code  
  • 测试

  启动Kafka消息的生产者

  bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1

 

  在IDEA中启动任务,接收Kafka消息

4、性能优化

1、减小批数据的执行时间

Spark中有几个优化能够减小批处理的时间:

① 数据接收的并行水平

经过网络(kafkaflumesocket)接收数据须要这些数据反序列化并被保存到Spark中。若是数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每一个输入DStream建立一个receiver(运行在worker机器上)接收单个数据流。建立多个输入DStream并配置它们能够从源中接收不一样分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream能够被切分为两个kafka输入流,每一个接收一个topic。这将在两个worker上运行两个receiver,所以容许数据并行接收,提升总体的吞吐量。多个DStream能够被合并生成单个DStream,这样运用在单个输入DStreamtransformation操做能够运用在合并的DStream上。

② 数据处理的并行水平

若是运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数经过配置属性来肯定spark.default.parallelism

③ 数据序列化

能够经过改变序列化格式来减小数据序列化的开销。在流式传输的状况下,有两种类型的数据会被序列化:

  输入数据

  由流操做生成的持久RDD

在上述两种状况下,使用Kryo序列化格式能够减小CPU和内存开销。

2、设置正确的批容量

为了Spark Streaming应用程序可以在集群中稳定运行,系统应该可以以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这能够经过流的网络UI观察获得。批处理时间应该小于批间隔时间。

 

根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率能够经过应用程序维持。能够考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能能够每2秒打印一次单词计数(批间隔时间为2秒),但没法每500毫秒打印一次单词计数。因此,为了在生产环境中维持指望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)

 

找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。

3、内存调优

在这一节,咱们重点介绍几个强烈推荐的自定义选项,它们能够减小Spark Streaming应用程序垃圾回收的相关暂停,得到更稳定的批处理时间。

  Default persistence level of DStreamsRDDs不一样的是,默认的持久化级别是序列化数据到内存中(DStreamStorageLevel.MEMORY_ONLY_SERRDDStorageLevel.MEMORY_ONLY)。即便保存数据为序列化形态会增长序列化/反序列化的开销,可是能够明显的减小垃圾回收的暂停。

 

  Clearing persistent RDDs默认状况下,经过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。若是spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值须要根据Spark Streaming应用程序的操做当心设置。然而,能够设置配置选项spark.streaming.unpersisttrue来更智能的去持久化(unpersistRDD。这个配置使系统找出那些不须要常常保有的RDD,而后去持久化它们。这能够减小Spark RDD的内存使用,也可能改善垃圾回收的行为。

 

  Concurrent garbage collector使用并发的标记-清除垃圾回收能够进一步减小垃圾回收的暂停时间。尽管并发的垃圾回收会减小系统的总体吞吐量,可是仍然推荐使用它以得到更稳定的批处理时间。

相关文章
相关标签/搜索