Spark Streaming笔记整理(三):DS的transformation与output操做

DStream的各类transformation

Transformation Meaning
map(func)         对DStream中的各个元素进行func函数操做,而后返回一个新的DStream. flatMap(func)       与map方法相似,只不过各个输入项能够被输出为零个或多个输出项 filter(func)        过滤出全部函数func返回值为true的DStream元素并返回一个新的DStream repartition(numPartitions) 增长或减小DStream中的分区数,从而改变DStream的并行度 union(otherStream)     将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream. count()          经过对DStreaim中的各个RDD中的元素进行计数,而后返回只有一个元素的RDD构成的DStream reduce(func)       对源DStream中的各个RDD中的元素利用func进行聚合操做,而后返回只有一个元素的RDD构成的新的DStream. countByValue()       对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数 reduceByKey(func, [numTasks])利用func函数对源DStream中的key进行聚合操做,而后返回新的(K,V)对构成的DStream join(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream transform(func)     经过RDD-to-RDD函数做用于源码DStream中的各个RDD,能够是任意的RDD操做,从而返回一个新的RDD updateStateByKey(func)   根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的Dstream Window 函数: 

能够看到不少都是在RDD中已经有的transformation算子操做,因此这里只关注transform、updateStateByKey和window函数php

transformation之transform操做

DStream transform

一、transform操做,应用在DStream上时,能够用于执行任意的RDD到RDD的转换操做。它能够用于实现,DStream API中所没有提供的操做。好比说,DStream API中,并无提供将一个DStream中的每一个batch,与一个特定的RDD进行join的操做。可是咱们本身就可使用transform操做来实现该功能。java

二、DStream.join(),只能join其余DStream。在DStream每一个batch的RDD计算出来以后,会去跟其余DStream的RDD进行join。python

案例

测试代码以下:shell

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

/** * 使用Transformation之transform来完成在线黑名单过滤 * 需求: * 将日志数据中来自于ip["27.19.74.143", "110.52.250.126"]实时过滤掉 * 数据格式 * 27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127 */ object _06SparkStreamingTransformOps { def main(args: Array[String]): Unit = { if (args == null || args.length < 2) { System.err.println( """Parameter Errors! Usage: <hostname> <port> |hostname: 监听的网络socket的主机名或ip地址 |port: 监听的网络socket的端口 """.stripMargin) System.exit(-1) } Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) val hostname = args(0).trim val port = args(1).trim.toInt //黑名单数据 val blacklist = List(("27.19.74.143", true), ("110.52.250.126", true)) // val blacklist = List("27.19.74.143", "110.52.250.126") val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacklist) val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) // 若是用到一个DStream和rdd进行操做,没法使用dstream直接操做,只能使用transform来进行操做 val filteredDStream:DStream[String] = linesDStream.transform(rdd => { val ip2InfoRDD:RDD[(String, String)] = rdd.map{line => { (line.split("##")(0), line) }} /** A(M) B(N)两张表: * across join * 交叉链接,没有on条件的链接,会产生笛卡尔积(M*N条记录) 不能用 * inner join * 等值链接,取A表和B表的交集,也就是获取在A和B中都有的数据,没有的剔除掉 不能用 * left outer join * 外连接:最经常使用就是左外链接(将左表中全部的数据保留,右表中可以对应上的数据正常显示,在右表中对应不上,显示为null) * 能够经过非空判断是左外链接达到inner join的结果 */ val joinedInfoRDD:RDD[(String, (String, Option[Boolean]))] = ip2InfoRDD.leftOuterJoin(blacklistRDD) joinedInfoRDD.filter{case (ip, (line, joined)) => { joined == None }}//执行过滤操做 .map{case (ip, (line, joined)) => line} }) filteredDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身 } }

nc中产生数据:数据库

[uplooking@uplooking01 ~]$ nc -lk 4893
27.19.74.143##2016-05-30 17:38:20##GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582
110.52.250.126##2016-05-30 17:38:20##GET /static/js/logging.js?y7a HTTP/1.1##200##603
8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

输出结果以下:apache

------------------------------------------- Time: 1526006084000 ms ------------------------------------------- 8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

transformation之updateStateByKey操做

概述

一、Spark Streaming的updateStateByKey能够DStream中的数据进行按key作reduce操做,而后对各个批次的数据进行累加。api

二、 updateStateByKey 解释缓存

以DStream中的数据进行按key作reduce操做,而后对各个批次的数据进行累加在有新的数据信息进入或更新时,可让用户保持想要的任何状。使用这个功能须要完成两步:ruby

1) 定义状态:能够是任意数据类型bash

2) 定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。对于有状态操做,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得愈来愈大

三、要思考的是若是数据量很大的时候,或者对性能的要求极为苛刻的状况下,能够考虑将数据放在Redis或者tachyon或者ignite上

四、注意,updateStateByKey操做,要求必须开启Checkpoint机制。

案例

Scala版

测试代码以下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 状态函数updateStateByKey * 更新key的状态(就是key对应的value) * * 一般的做用,计算某个key截止到当前位置的状态 * 统计截止到目前为止的word对应count * 要想完成截止到目前为止的操做,必须将历史的数据和当前最新的数据累计起来,因此须要一个地方来存放历史数据 * 这个地方就是checkpoint目录 * */ object _07SparkStreamingUpdateStateByKeyOps { def main(args: Array[String]): Unit = { if (args == null || args.length < 2) { System.err.println( """Parameter Errors! Usage: <hostname> <port> |hostname: 监听的网络socket的主机名或ip地址 |port: 监听的网络socket的端口 """.stripMargin) System.exit(-1) } val hostname = args(0).trim val port = args(1).trim.toInt Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) ssc.checkpoint("hdfs://ns1/checkpoint/streaming/usb") // 接收到的当前批次的数据 val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) // 这是记录下来的当前批次的数据 val rbkDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) val usbDStream:DStream[(String, Int)] = rbkDStream.updateStateByKey(updateFunc) usbDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身 } /** * @param seq 当前批次的key对应的数据 * @param history 历史key对应的数据,可能有可能没有 * @return */ def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] = { var sum = seq.sum if(history.isDefined) { sum += history.get } Option[Int](sum) } }

nc产生数据:

[uplooking@uplooking01 ~]$ nc -lk 4893 hello hello hello you hello he hello me

输出结果以下:

-------------------------------------------
Time: 1526009358000 ms
-------------------------------------------
(hello,2)

18/05/11 11:29:18 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000: ------------------------------------------- Time: 1526009360000 ms ------------------------------------------- (hello,5) (me,1) (you,1) (he,1) 18/05/11 11:29:20 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000: ------------------------------------------- Time: 1526009362000 ms ------------------------------------------- (hello,5) (me,1) (you,1) (he,1)

Java版

用法略有不一样,主要是 状态更新函数的写法上有区别,以下:

package cn.xpleaf.bigdata.spark.java.streaming.p1; import com.google.common.base.Optional; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.List; public class _02SparkStreamingUpdateStateByKeyOps { public static void main(String[] args) { if(args == null || args.length < 2) { System.err.println("Parameter Errors! Usage: <hostname> <port>"); System.exit(-1); } Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName()) .setMaster("local[2]"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2)); jsc.checkpoint("hdfs://ns1/checkpoint/streaming/usb"); String hostname = args[0].trim(); int port = Integer.valueOf(args[1].trim()); JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);//默认的持久化级别:MEMORY_AND_DISK_SER_2 JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> { return new Tuple2<String, Integer>(word, 1); }); JavaPairDStream<String, Integer> rbkDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // 作历史的累计操做 JavaPairDStream<String, Integer> usbDStream = rbkDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> current, Optional<Integer> history) throws Exception { int sum = 0; for (int i : current) { sum += i; } if (history.isPresent()) { sum += history.get(); } return Optional.of(sum); } }); usbDStream.print(); jsc.start();//启动流式计算 jsc.awaitTermination();//等待执行结束 jsc.close(); } }

transformation之window操做

DStream window 滑动窗口

Spark Streaming提供了滑动窗口操做的支持,从而让咱们能够对一个滑动窗口内的数据执行计算操做。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操做,而后生成的RDD,会做为window DStream的一个RDD。好比下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,而后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。因此每一个滑动窗口操做,都必须指定两个参数,窗口长度以及滑动间隔,并且这两个参数值都必须是batch间隔的整数倍。

Spark Streaming笔记整理(三):DS的transformation与output操做

1.红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

2.这里面每个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 并且每隔2个单位时间,窗口会slide一次。

因此基于窗口的操做,须要指定2个参数:

window length - The duration of the window (3 in the figure) slide interval - The interval at which the window-based operation is performed (2 in the figure). 1.窗口大小,我的感受是一段时间内数据的容器。 2.滑动间隔,就是咱们能够理解的cron表达式吧。 举个例子吧: 仍是以最著名的wordcount举例,每隔10秒,统计一下过去30秒过来的数据。 // Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

DSstream window滑动容器功能

window 对每一个滑动窗口的数据执行自定义的计算 countByWindow 对每一个滑动窗口的数据执行count操做 reduceByWindow 对每一个滑动窗口的数据执行reduce操做 reduceByKeyAndWindow 对每一个滑动窗口的数据执行reduceByKey操做 countByValueAndWindow 对每一个滑动窗口的数据执行countByValue操做

案例

测试代码以下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** *窗口函数window * 每隔多长时间(滑动频率slideDuration)统计过去多长时间(窗口长度windowDuration)中的数据 * 须要注意的就是窗口长度和滑动频率 * windowDuration = M*batchInterval, slideDuration = N*batchInterval */ object _08SparkStreamingWindowOps { def main(args: Array[String]): Unit = { if (args == null || args.length < 2) { System.err.println( """Parameter Errors! Usage: <hostname> <port> |hostname: 监听的网络socket的主机名或ip地址 |port: 监听的网络socket的端口 """.stripMargin) System.exit(-1) } val hostname = args(0).trim val port = args(1).trim.toInt Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) // 接收到的当前批次的数据 val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) val pairsDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)) // 每隔4s,统计过去6s中产生的数据 val retDStream:DStream[(String, Int)] = pairsDStream.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4)) retDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身 } }

nc产生数据:

[uplooking@uplooking01 ~]$ nc -lk 4893 hello you hello he hello me hello you hello he

输出结果以下:

------------------------------------------- Time: 1526016316000 ms ------------------------------------------- (hello,4) (me,1) (you,2) (he,1) ------------------------------------------- Time: 1526016320000 ms ------------------------------------------- (hello,5) (me,1) (you,2) (he,2) ------------------------------------------- Time: 1526016324000 ms -------------------------------------------

DStream的output操做以及foreachRDD

DStream output操做

一、print 打印每一个batch中的前10个元素,主要用于测试,或者是不须要执行什么output操做时,用于简单触发一下job。

二、saveAsTextFile(prefix, [suffix]) 将每一个batch的数据保存到文件中。每一个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix]

三、saveAsObjectFile 同上,可是将每一个batch的数据以序列化对象的方式,保存到SequenceFile中。

四、saveAsHadoopFile 同上,将数据保存到Hadoop文件中

五、foreachRDD 最经常使用的output操做,遍历DStream中的每一个产生的RDD,进行处理。能够将每一个RDD中的数据写入外部存储,好比文件、数据库、缓存等。一般在其中,是针对RDD执行action操做的,好比foreach。

DStream foreachRDD详解

相关内容其实在Spark开发调优中已经有相关的说明。

一般在foreachRDD中,都会建立一个Connection,好比JDBC Connection,而后经过Connection将数据写入外部存储。

误区一:在RDD的foreach操做外部,建立Connection

这种方式是错误的,由于它会致使Connection对象被序列化后传输到每一个Task中。而这种Connection对象,实际上通常是不支持序列化的,也就没法被传输。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection() 
  rdd.foreach { record => connection.send(record) } }

误区二:在RDD的foreach操做内部,建立Connection

这种方式是能够的,可是效率低下。由于它会致使对于RDD中的每一条数据,都建立一个Connection对象。而一般来讲,Connection的建立,是很消耗性能的。

dstream.foreachRDD { rdd =>
  rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }

DStream foreachRDD合理使用

合理方式一:使用RDD的foreachPartition操做,而且在该操做内部,建立Connection对象,这样就至关因而,为RDD的每一个partition建立一个Connection对象,节省资源的多了。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }

合理方式二:本身手动封装一个静态链接池,使用RDD的foreachPartition操做,而且在该操做内部,从静态链接池中,经过静态方法,获取到一个链接,使用以后再还回去。这样的话,甚至在多个RDD的partition之间,也能够复用链接了。并且可让链接池采起懒建立的策略,而且空闲一段时间后,将其释放掉。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) } }

foreachRDD 与foreachPartition实现实战

须要注意的是:

(1)、你最好使用forEachPartition函数来遍历RDD,而且在每台Work上面建立数据库的connection。

(2)、若是你的数据库并发受限,能够经过控制数据的分区来减小并发。

(3)、在插入MySQL的时候最好使用批量插入。

(4),确保你写入的数据库过程可以处理失败,由于你插入数据库的过程可能会通过网络,这可能致使数据插入数据库失败。

(5)、不建议将你的RDD数据写入到MySQL等关系型数据库中。

这部份内容其实能够参考开发调优部分的案例,只是那里并无foreachRDD,由于其并无使用DStream,可是原理是同样的,由于最终都是针对RDD来进行操做的。

 

 

原文连接:http://blog.51cto.com/xpleaf/2115343

相关文章
相关标签/搜索