版本支持来自官网:html
Kafka: Spark Streaming 2.2.0 is compatible with Kafka broker versions 0.8.2.1 or higher. See the Kafka Integration Guide for more details.java
Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you will have to add the following dependency to your SBT or Maven project.mysql
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency>
For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark Streaming core API, you will have to add the corresponding artifact spark-streaming-xyz_2.11
to the dependencies. For example, some of the common ones are as follows.sql
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10 | |
---|---|---|
Broker Version | 0.8.2.1 or higher | 0.10.0 or higher |
Api Stability | Stable | Experimental |
Language Support | Scala, Java, Python | Scala, Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit Api | No | Yes |
Dynamic Topic Subscription | No | Yes |
pom.xml:apache
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xp.cn</groupId> <artifactId>sparkXN</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>sparkXN</name> <url>http://maven.apache.org</url> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <properties> <maven.compiler.source>1.5</maven.compiler.source> <maven.compiler.target>1.5</maven.compiler.target> <encoding>UTF-8</encoding> <hadoop.version>2.5.0</hadoop.version> <spark.version>2.2.0</spark.version> <scala.version>2.11.8</scala.version> <mysql.version>5.1.20</mysql.version> </properties> <dependencies> <!-- 导入scala的依赖 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 导入sparkCore的依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- 导入sparkSql的依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- 导入sparkCore的依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- sparkStreaming 的依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- sparkStreaming 和kafka整合的依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- HDFS Client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- mysql依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.8.1</version> <scope>test</scope> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <!-- <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration>--> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.4</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> </plugins> </build> </project>
spark实现:api
package com.xp.cn.streaming import org.apache.log4j.{Level, Logger} import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkContext, SparkConf} /** * Created by xupan on 2017/12/16. */ object KafkaStreaming { def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) //建立conf,spark streaming至少要启动两个线程,一个负责接受数据,一个负责处理数据 val conf = new SparkConf().setAppName("FirstStreaming").setMaster("local[2]") //建立SparkContext val sc = new SparkContext(conf) //建立StreamingContext,每隔10秒产生一个批次 val ssc = new StreamingContext(sc, Seconds(10)) val zkQuorum = "xupan001:2181,xupan002:2181,xupan003:2181" val groupId = "g1" val topic = Map[String,Int]("test001" -> 1) //建立kafkaDStream //createDirectStream:上产环境用,底层API, //createStream,效率低,数据可能丢失 val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic) val lines = data.map(_._2) val countSD = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) //action触发,每次计算只是计算当前批次的结果 countSD.print() //启动,开始接收数据并用streamingContext.start() ssc.start() //等待处理中止,streamingContext.awaitTermination() ssc.awaitTermination() } }
-------------------------------------------
Time: 1513414020000 ms
-------------------------------------------
(test001,5)app
-------------------------------------------
Time: 1513414030000 ms
-------------------------------------------
(wwwwwwww,1)
(helo,1)
(qwwwww,1)less
-------------------------------------------
Time: 1513414040000 ms
-------------------------------------------
(wwwwwwww,1)
(helo,1)
(qwwwww,1)maven
-------------------------------------------
Time: 1513414050000 ms
-------------------------------------------
(wwwwwwww,10)
(helo,10)
(qwwwww,10)分布式
-------------------------------------------
Time: 1513414060000 ms
-------------------------------------------
(wwwwwwww,5)
(helo,5)
(qwwwww,5)
说明:
This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.
However, under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See Deploying section in the streaming programming guide for more details on Write Ahead Logs.
此方法使用 Receiver 来接收数据. Receiver 是使用 Kafka high-level consumer API 实现的. 与全部 Receiver 同样, Receiver 从 Kafka 接收的数据并存储在 Spark executor 中, 而后由 Spark Streaming 启动的做业处理数据.
然而, 在默认配置下, 这种方法在程序失败时会丢失数据(请看 receiver reliability ( receiver 的可靠性) , 为了确保零数据丢失, 必须启用 Spark Streaming 中的 Write Ahead Logs(预写日志)(在 Spark 1.2 中引入), 这会同步保存全部从 Kafka 接收的数据写入分布式文件系统(例如 HDFS), 以便全部数据能够在故障时恢复. 有关 Write Ahead (预写日志)
解决办法:No Receivers
This new receiver-less “direct” approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
This approach has the following advantages over the receiver-based approach (i.e. Approach 1).
Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream
, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).
Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).
这种新的无 Receiver 的 “direct(直接)” 方法已经在 Spark 1.3 中引入, 以确保更强的 end-to-end guarantees (端到端保证). 代替使用 receiver(接收器)来接收数据, 该方法周期性地查询 Kafka 以得到每一个 topic(主题)和 partition(分区)中最新的 offset(偏移量), 而且定义每一个批量中处理的 offset ranges(偏移范围). 当启动做业时, Kafka 的 simple consumer API Kafka 用于从 kafka 中读取定义好 offset ranges(偏移范围)的数据(相似于从文件系统读取文件). 请注意, 针对 Scala 和 Java API 此特性在 Spark 1.3 中就引入了, 针对 Python API 在 Spark 1.4 中开始引入.
这种方法相较方法 1 有如下优势.
Simplified Parallelism(简化并行性): 无需建立多个输入 Kafka 流并联合它们. 使用 directStream
, Spark Streaming 将建立与消费的 Kafka partition 同样多的 RDD 分区, 这将从 Kafka 并行读取数据. 所以, Kafka 和 RDD 分区之间存在一对一映射, 这更容易理解和调整.
Efficiency(高效): 在第一种方法中实现 zero-data loss (零数据丢失)须要将数据存储在预写日志中, 该日志进一步复制数据. 这其实是低效的, 由于数据有效地被复制两次 - 一次是 Kafka, 另外一次是 Write Ahead Log. 第二种方法消除了问题, 由于没有接收器, 所以不须要 Write Ahead Logs. 只要您的 Kafka 有足够的保留时间消息能够从 Kafka 恢复.
Exactly-once semantics(一次且仅一次语义): 第一种方法使用 Kafka 的 high level API(高级API)在 Zookeeper 中存储 consume 的 offset(偏移量). 这是传统上消费 Kafka 数据的方式. 虽然这种方法(与预写日志结合)能够确保零数据丢失(即 at-least once 至少一次语义), 可是在某些故障状况下, 一些 record(记录)很小的可能性会被消费两次. 这是由于 Spark Streaming 可靠接收的数据与 Zookeeper 跟踪的 offset(偏移)之间存在不一致. 所以, 在第二种方法中, 咱们使用不依赖 Zookeeper 的 simple Kafka API. offset(偏移)由 Spark Streaming 在其 checkpoints(检查点)内进行跟踪. 这消除了 Spark Streaming 和 Zookeeper/Kafka 之间的不一致, 所以, 尽管出现故障, Spark Streaming 仍然有效地 exactly once(刚好一次)接收了每条 record(记录). 为了实现输出结果的 exactly once(刚好一次)的语义, 将数据保存到外部数据存储的输出操做必须是幂等的, 或者是保存结果和 offset(偏移量)的原子事务(请参阅 Semantics of output operations 获取更多信息).
注意, 这种方法的一个缺点是它不更新 Zookeeper 中的 offset(偏移), 所以基于 Zookeeper 的 Kafka 监控工具将不会显示进度. 可是, 您能够在每一个批处理中访问由此方法处理的 offset(偏移量), 并本身更新 Zookeeper
No Receivers
package com.xp.cn.streaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} /** * Created by xupan on 2017/12/16. * kafka V:0.8 */ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) //指定组名,消费者组之间消费数据是没有关系的,A组记录A组的偏移量,B组记录B组的偏移量 //每个topic对应的消费组都有本身偏移量 topic+group===》offset val group = "g001" //建立SparkConf val conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]") //建立SparkStreaming,并设置间隔时间 val ssc = new StreamingContext(conf, Duration(5000)) //指定消费的 topic 名字 val topic = "wwcc" //指定kafka的broker地址(sparkStream的Task直连到kafka的分区上,用更加底层的API消费,效率更高) val brokerList = "xupan001:9092,xupan001:9092,xupan001:9092" //指定zk的地址,后期更新消费的偏移量时使用(之后可使用Redis、MySQL来记录偏移量) val zkQuorum = "xupan001:2181,xupan001:2181,xupan001:2181" //建立 stream 时使用的 topic 名字集合,SparkStreaming可同时消费多个topic val topics: Set[String] = Set(topic) //建立一个 ZKGroupTopicDirs 对象,实际上是指定往zk中写入数据的目录,用于保存偏移量 val topicDirs = new ZKGroupTopicDirs(group, topic) //获取 zookeeper 中的路径 "/g001/offsets/wordcount/" val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //准备kafka的参数 val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, //从头开始读取数据 "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString ) //zookeeper 的host 和 ip,建立一个 client,用于跟新偏移量量的 //是zookeeper的客户端,能够从zk中读取偏移量数据,并更新偏移量 val zkClient = new ZkClient(zkQuorum) //查询该路径下节点,也就是分区(默认有字节点为咱们本身保存不一样 partition 时生成的) // /g001/offsets/wordcount/0/10001" // /g001/offsets/wordcount/1/30001" // /g001/offsets/wordcount/2/10001" //zkTopicPath -> /g001/offsets/wordcount/ val children = zkClient.countChildren(zkTopicPath) /** * 两种状况: * 1:从头读 * 2:接着偏移量读 */ var kafkaStream: InputDStream[(String, String)] = null //若是 zookeeper 中有保存 offset,咱们会利用这个 offset 做为 kafkaStream 的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() //若是保存过 offset,读过并记录了offset if (children > 0) { for (i <- 0 until children) { // /g001/offsets/wordcount/0/10001 // /g001/offsets/wordcount/0 val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}") // wordcount/0 val tp = TopicAndPartition(topic, i) //将不一样 partition 对应的 offset 增长到 fromOffsets 中 // wordcount/0 -> 10001 fromOffsets += (tp -> partitionOffset.toLong) } //Key: kafka的key values: "hello tom hello jerry" //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (kafka的key, message) 这样的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) //经过KafkaUtils建立直连的DStream(fromOffsets参数的做用是:按照前面计算好了的偏移量继续消费数据) //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解码方式 value的解码方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } else { //若是未保存,根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范围 var offsetRanges = Array[OffsetRange]() //从kafka读取的消息,DStream的Transform方法能够将当前批次的RDD获取出来 //该transform方法计算获取到当前批次RDD,而后将RDD的偏移量取出来,而后在将RDD返回到DStream val transform: DStream[(String, String)] = kafkaStream.transform { rdd => //获得该 rdd 对应 kafka 的消息的 offset //该RDD是一个KafkaRDD,能够得到偏移量的范围 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } val messages: DStream[String] = transform.map(_._2) //依次迭代DStream中的RDD //foreachRDD触发的实际操做是DStream转换,kafkaStream.foreachRDD这一步其实是在Driver中执行的 //rdd.foreach是在Executor中执行的 messages.foreachRDD { rdd => //对RDD进行操做,触发Action println("===============partionsnum:"+rdd.getNumPartitions) //foreachPartition在Executor中执行 rdd.foreachPartition(partition => partition.foreach(x => { //println(x) }) ) for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") // /g001/offsets/wordcount/0 val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" //将该 partition 的 offset 保存到 zookeeper // /g001/offsets/wordcount/0/20000 ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString) } } ssc.start() ssc.awaitTermination() } }
通过测试,很强健
wwcc 2 4096 4100
wwcc 0 0 0
wwcc 1 0 0
===============partionsnum:3
wwcc 2 4100 4100
wwcc 0 0 0
wwcc 1 0 0
===============partionsnum:3
17/12/16 21:18:45 INFO VerifiableProperties: Verifying properties
17/12/16 21:18:45 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest
17/12/16 21:18:45 INFO VerifiableProperties: Property group.id is overridden to g001
17/12/16 21:18:45 INFO VerifiableProperties: Property zookeeper.connect is overridden to
wwcc 2 4100 4111
wwcc 0 0 0
wwcc 1 0 0
===============partionsnum:3
17/12/16 21:18:50 INFO VerifiableProperties: Verifying properties
17/12/16 21:18:50 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest
17/12/16 21:18:50 INFO VerifiableProperties: Property group.id is overridden to g001
17/12/16 21:18:50 INFO VerifiableProperties: Property zookeeper.connect is overridden to
wwcc 2 4111 4125
wwcc 0 0 0
wwcc 1 0 0
===============partionsnum:3
17/12/16 21:18:55 INFO VerifiableProperties: Verifying properties
17/12/16 21:18:55 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest
17/12/16 21:18:55 INFO VerifiableProperties: Property group.id is overridden to g001
17/12/16 21:18:55 INFO VerifiableProperties: Property zookeeper.connect is overridden to
wwcc 2 4125 4135
Version2:
package com.xp.cn.streaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} /** * Created by zx on 2017/7/31. */ object KafkaDirectWordCountV2 { def main(args: Array[String]): Unit = { //指定组名 val group = "g001" //建立SparkConf val conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]") //建立SparkStreaming,并设置间隔时间 val ssc = new StreamingContext(conf, Duration(5000)) //指定消费的 topic 名字 val topic = "wwcc" //指定kafka的broker地址(sparkStream的Task直连到kafka的分区上,用更加底层的API消费,效率更高) val brokerList = "xupan001:9092,xupan001:9092,xupan001:9092" //指定zk的地址,后期更新消费的偏移量时使用(之后可使用Redis、MySQL来记录偏移量) val zkQuorum = "xupan001:2181,xupan001:2181,xupan001:2181" //建立 stream 时使用的 topic 名字集合,SparkStreaming可同时消费多个topic val topics: Set[String] = Set(topic) //建立一个 ZKGroupTopicDirs 对象,实际上是指定往zk中写入数据的目录,用于保存偏移量 val topicDirs = new ZKGroupTopicDirs(group, topic) //获取 zookeeper 中的路径 "/g001/offsets/wordcount/" val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //准备kafka的参数 val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, //从头开始读取数据 "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString ) //zookeeper 的host 和 ip,建立一个 client,用于跟新偏移量量的 //是zookeeper的客户端,能够从zk中读取偏移量数据,并更新偏移量 val zkClient = new ZkClient(zkQuorum) //查询该路径下是否字节点(默认有字节点为咱们本身保存不一样 partition 时生成的) // /g001/offsets/wordcount/0/10001" // /g001/offsets/wordcount/1/30001" // /g001/offsets/wordcount/2/10001" //zkTopicPath -> /g001/offsets/wordcount/ val children = zkClient.countChildren(zkTopicPath) var kafkaStream: InputDStream[(String, String)] = null //若是 zookeeper 中有保存 offset,咱们会利用这个 offset 做为 kafkaStream 的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() //若是保存过 offset if (children > 0) { for (i <- 0 until children) { // /g001/offsets/wordcount/0/10001 // /g001/offsets/wordcount/0 val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}") // wordcount/0 val tp = TopicAndPartition(topic, i) //将不一样 partition 对应的 offset 增长到 fromOffsets 中 // wordcount/0 -> 10001 fromOffsets += (tp -> partitionOffset.toLong) } //Key: kafka的key values: "hello tom hello jerry" //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (kafka的key, message) 这样的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) //经过KafkaUtils建立直连的DStream(fromOffsets参数的做用是:按照前面计算好了的偏移量继续消费数据) //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解码方式 value的解码方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } else { //若是未保存,根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范围 var offsetRanges = Array[OffsetRange]() //直连方式只有在KafkaDStream的RDD中才能获取偏移量,那么就不能到调用DStream的Transformation //因此只能子在kafkaStream调用foreachRDD,获取RDD的偏移量,而后就是对RDD进行操做了 //依次迭代KafkaDStream中的KafkaRDD //foreachRDD触发的实际操做是DStream转换,kafkaStream.foreachRDD这一步其实是在Driver中调用的 //rdd.foreach是在Executor中执行的 kafkaStream.foreachRDD { kafkaRDD => //只有KafkaRDD能够强转成HasOffsetRanges,并获取到偏移量 offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] = kafkaRDD.map(_._2) //对RDD进行操做,触发Action //foreachPartition在Executor中执行 lines.foreachPartition(partition => partition.foreach(x => { println(x) }) ) for (o <- offsetRanges) { // /g001/offsets/wordcount/0 val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" //将该 partition 的 offset 保存到 zookeeper // /g001/offsets/wordcount/0/20000 ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString) } } ssc.start() ssc.awaitTermination() } }