sparkStreaming的transformation和action详解

根据Spark官方文档中的描述,在Spark Streaming应用中,一个DStream对象能够调用多种操做,主要分为如下几类

  • Transformations
  • Window Operations
  • Join Operations
  • Output Operations

1、Transformations

 

一、map(func)
  map操做须要传入一个函数当作参数,具体调用形式为

[]  纯文本查看 复制代码
?
1
val b = a.map(func)

  主要做用是,对DStream对象a,将func函数做用到a中的每个元素上并生成新的元素,获得的DStream对象b中包含这些新的元素。

  下面示例代码的做用是,在接收到的一行消息后面拼接一个”_NEW”字符串

[Scala]  纯文本查看 复制代码
?
1
val linesNew = lines.map(lines = > lines + "_NEW" )

  程序运行结果以下:

<ignore_js_op>

  注意与接下来的flatMap操做进行比较。

二、flatMap(func)
  相似于上面的map操做,具体调用形式为

[Scala]  纯文本查看 复制代码
?
1
val b = a.flatMap(func)

  主要做用是,对DStream对象a,将func函数做用到a中的每个元素上并生成0个或多个新的元素,获得的DStream对象b中包含这些新的元素。

  下面示例代码的做用是,在接收到的一行消息lines后,将lines根据空格进行分割,分割成若干个单词

[Scala]  纯文本查看 复制代码
?
1
val words = lines.flatMap( _ .split( " " ))

  结果以下:

<ignore_js_op>

三、 filter(func)
  filter传入一个func函数,具体调用形式为

[Scala]  纯文本查看 复制代码
?
1
val b = a.filter(func)

  对DStream a中的每个元素,应用func方法进行计算,若是func函数返回结果为true,则保留该元素,不然丢弃该元素,返回一个新的DStream b。

  下面示例代码中,对words进行判断,去除hello这个单词。

结果以下:

<ignore_js_op>

四、union(otherStream)
  这个操做将两个DStream进行合并,生成一个包含着两个DStream中全部元素的新DStream对象。

  下面代码,首先将输入的每个单词后面分别拼接“_one”和“_two”,最后将这两个DStream合并成一个新的DStream

[Scala]  纯文本查看 复制代码
?
1
2
3
4
5
6
7
val wordsOne = words.map( _ + "_one" )
val wordsTwo = words.map( _ + "_two" )
val unionWords = wordsOne.union(wordsTwo)
 
wordsOne.print()
wordsTwo.print()
unionWords.print()

  运行结果以下:

<ignore_js_op>

五、count()
  统计DStream中每一个RDD包含的元素的个数,获得一个新的DStream,这个DStream中只包含一个元素,这个元素是对应语句单词统计数值。

  如下代码,统计每一行中的单词数

[Scala]  纯文本查看 复制代码
?
1
val wordsCount = words.count()

运行结果以下,一行输入4个单词,打印的结果也为4。

<ignore_js_op>

六、reduce(func)
  返回一个包含一个元素的DStream,传入的func方法会做用在调用者的每个元素上,将其中的元素顺次的两两进行计算。

  下面的代码,将每个单词用 "-"符号进行拼接

[Scala]  纯文本查看 复制代码
?
1
val reduceWords = words.reduce( _ + "-" + _ )

运行结果以下:

<ignore_js_op>

七、countByValue()
  某个DStream中的元素类型为K,调用这个方法后,返回的DStream的元素为(K, Long)对,后面这个Long值是原DStream中每一个RDD元素key出现的频率。

  如下代码统计words中不一样单词的个数

[Scala]  纯文本查看 复制代码
?
1
val countByValueWords = words.countByValue()

  结果以下:

<ignore_js_op>

八、reduceByKey(func, [numTasks])
  调用这个操做的DStream是以(K, V)的形式出现,返回一个新的元素格式为(K, V)的DStream。返回结果中,K为原来的K,V是由K通过传入func计算获得的。还能够传入一个并行计算的参数,在local模式下,默认为2。在其余模式下,默认值由参数 spark.default.parallelism肯定。

  下面代码将words转化成(word, 1)的形式,再以单词为key,个数为value,进行word count。

[Scala]  纯文本查看 复制代码
?
1
2
val pairs = words.map(word = > (word , 1 ))
val wordCounts = pairs.reduceByKey( _ + _ )

  结果以下,

<ignore_js_op>

九、join(otherStream, [numTasks])
  由一个DStream对象调用该方法,元素内容为 (k, V),传入另外一个DStream对象,元素内容为(k, W),返回的DStream中包含的内容是 (k, (V, W))。这个方法也能够传入一个并行计算的参数,该参数与reduceByKey中是相同的。

  下面代码中,首先将words转化成 (word, (word + "_one"))(word, (word + "_two"))的形式,再以word为key,将后面的value合并到一块儿。

[Scala]  纯文本查看 复制代码
?
1
2
3
val wordsOne = words.map(word = > (word , word + "_one" ))
val wordsTwo = words.map(word = > (word , word + "_two" ))
val joinWords = wordsOne.join(wordsTwo)

  运行结果以下:

<ignore_js_op>

十、cogroup(otherStream, [numTasks])
  由一个DStream对象调用该方法,元素内容为(k, V),传入另外一个DStream对象,元素内容为(k, W),返回的DStream中包含的内容是 (k, (Seq[V], Seq[W]))。这个方法也能够传入一个并行计算的参数,该参数与reduceByKey中是相同的。

  下面代码首先将words转化成 (word, (word + "_one"))(word, (word + "_two"))的形式,再以word为key,将后面的value合并到一块儿。

  结果以下:

<ignore_js_op>

十一、transform(func)

 

十二、updateStateByKey(func)

 

2、Window Operations
  我以为用一个成语,管中窥豹,基本上就可以很形象的解释什么是窗口函数了。DStream数据流就是那只豹子,窗口就是那个管,以一个固定的速率平移,就可以每次看到豹的一部分。

  窗口函数,就是在DStream流上,以一个可配置的长度为窗口,以一个可配置的速率向前移动窗口,根据窗口函数的具体内容,分别对当前窗口中的这一波数据采起某个对应的操做算子。须要注意的是窗口长度,和窗口移动速率须要是batch time的整数倍。接下来演示Spark Streaming中提供的主要窗口函数。

一、window(windowLength, slideInterval)
  该操做由一个DStream对象调用,传入一个窗口长度参数,一个窗口移动速率参数,而后将当前时刻当前长度窗口中的元素取出造成一个新的DStream。

  下面的代码以长度为3,移动速率为1截取源DStream中的元素造成新的DStream。

[Scala]  纯文本查看 复制代码
?
1
val windowWords = words.window(Seconds( 3 ), Seconds( 1 ))

  运行结果以下:

<ignore_js_op>

  基本上每秒输入一个字母,而后取出当前时刻3秒这个长度中的全部元素,打印出来。从上面的截图中能够看到,下一秒时已经看不到a了,再下一秒,已经看不到b和c了。表示a, b, c已经不在当前的窗口中。

二、 countByWindow(windowLength,slideInterval)
  返回指定长度窗口中的元素个数。

  代码以下,统计当前3秒长度的时间窗口的DStream中元素的个数:

[Scala]  纯文本查看 复制代码
?
1
val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1 ))

  结果以下:

<ignore_js_op>

三、 reduceByWindow(func, windowLength,slideInterval)
  相似于上面的reduce操做,只不过这里再也不是对整个调用DStream进行reduce操做,而是在调用DStream上首先取窗口函数的元素造成新的DStream,而后在窗口元素造成的DStream上进行reduce。

  代码以下:

[Scala]  纯文本查看 复制代码
?
1
val windowWords = words.reduceByWindow( _ + "-" + _ , Seconds( 3 ) , Seconds( 1 ))

  结果以下:

<ignore_js_op>

四、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])
  调用该操做的DStream中的元素格式为(k, v),整个操做相似于前面的reduceByKey,只不过对应的数据源不一样,reduceByKeyAndWindow的数据源是基于该DStream的窗口长度中的全部数据。该操做也有一个可选的并发数参数。

  下面代码中,将当前长度为3的时间窗口中的全部数据元素根据key进行合并,统计当前3秒中内不一样单词出现的次数。

[Scala]  纯文本查看 复制代码
?
1
val windowWords = pairs.reduceByKeyAndWindow((a : Int , b : Int) = > (a + b) , Seconds( 3 ) , Seconds( 1 ))

  结果以下:

<ignore_js_op>

五、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])
  这个窗口操做和上一个的区别是多传入一个函数invFunc。前面的func做用和上一个reduceByKeyAndWindow相同,后面的invFunc是用于处理流出rdd的。

  在下面这个例子中,若是把3秒的时间窗口当成一个池塘,池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。而第二个函数是,每由出去一条鱼,就将该鱼的总数减去一。

[Scala]  纯文本查看 复制代码
?
1
val windowWords = pairs.reduceByKeyAndWindow((a : Int, b : Int ) = > (a + b) , (a : Int, b : Int) = > (a - b) , Seconds( 3 ), Seconds( 1 ))

  下面是演示结果,最终的结果是该3秒长度的窗口中历史上出现过的全部不一样单词个数都为0。

<ignore_js_op>

  一段时间不输入任何信息,看一下最终结果

<ignore_js_op>

六、 countByValueAndWindow(windowLength,slideInterval, [numTasks])
  相似于前面的countByValue操做,调用该操做的DStream数据格式为(K, v),返回的DStream格式为(K, Long)。统计当前时间窗口中元素值相同的元素的个数。

  代码以下

[Scala]  纯文本查看 复制代码
?
1
val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1 ))[/align]

  结果以下

<ignore_js_op>

3、Join Operations
  Join主要可分为两种,

一、DStream对象之间的Join
  这种join通常应用于窗口函数造成的DStream对象之间,具体能够参考第一部分中的join操做,除了简单的join以外,还有leftOuterJoin, rightOuterJoin和fullOuterJoin。

二、DStream和dataset之间的join
  这一种join,能够参考前面transform操做中的示例。

4、Output Operations
  在Spark Streaming中,DStream的输出操做才是DStream上全部transformations的真正触发计算点,这个相似于RDD中的action操做。通过输出操做DStream中的数据才能与外部进行交互,好比将数据写入文件系统、数据库,或其余应用中。   
  
一、print()
  print操做会将DStream每个batch中的前10个元素在driver节点打印出来。

  看下面这个示例,一行输入超过10个单词,而后将这行语句分割成单个单词的DStream。

[Scala]  纯文本查看 复制代码
?
1
2
val words = lines.flatMap( _ .split( " " ))
words.print()

  看看print后的效果。

<ignore_js_op>

二、saveAsTextFiles(prefix, [suffix])
  这个操做能够将DStream中的内容保存为text文件,每一个batch的数据单独保存为一个文夹,文件夹名前缀参数必须传入,文件夹名后缀参数可选,最终文件夹名称的完整形式为 prefix-TIME_IN_MS[.suffix]

  好比下面这一行代码

[Scala]  纯文本查看 复制代码
?
1
lines.saveAsTextFiles( "satf" , ".txt" )[/align][align = left]

  看一下执行结果,在当前项目路径下,每秒钟生成一个文件夹,打开的两个窗口中的内容分别是nc窗口中的输入。

<ignore_js_op>

  另外,若是前缀中包含文件完整路径,则该text文件夹会建在指定路径下,以下图所示

<ignore_js_op>

三、saveAsObjectFiles(prefix, [suffix])
  这个操做和前面一个相似,只不过这里将DStream中的内容保存为SequenceFile文件类型,这个文件中保存的数据都是通过序列化后的Java对象。 
  实验略过,可参考前面一个操做。 
  
四、saveAsHadoopFiles(prefix, [suffix])
  这个操做和前两个相似,将DStream每一batch中的内容保存到HDFS上,一样能够指定文件的前缀和后缀。    
五、foreachRDD(func)
相关文章
相关标签/搜索