kafka做为一个实时的分布式消息队列,实时的生产和消费消息,这里咱们能够利用SparkStreaming实时计算框架实时地读取kafka中的数据而后进行计算。在spark1.3版本后,kafkaUtils里面提供了两个建立dstream的方法,一种为KafkaUtils.createDstream,另外一种为KafkaUtils.createDirectStream。html
1.KafkaUtils.createDstream方式
构造函数为KafkaUtils.createDstream(ssc,[zk], [consumer group id], [per-topic,partitions] ) 使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于全部的receivers接收到的数据将会保存在Spark executors中,而后经过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,它同步将接受到数据保存到分布式文件系统上好比HDFS。 因此数据在出错的状况下能够恢复出来 。java
A、建立一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故若是增长特定主消费的线程数仅仅是增长一个receiver中消费topic的线程数,并不增长spark的并行处理数据数量。
B、对于不一样的group和topic可使用多个receivers建立不一样的DStream
C、若是启用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)node
同时须要设置存储级别(默认StorageLevel.MEMORY_AND_DISK_SER_2),正则表达式
即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)redis
1.1KafkaUtils.createDstream实战
(1)添加kafka的pom依赖shell
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_0-10_2.11</artifactId>
<version>2.0.2</version>
</dependency>apache
(2)启动zookeeper集群编程
zkServer.sh startbootstrap
(3)启动kafka集群api
kafka-server-start.sh /export/servers/kafka/config/server.properties
(4)建立topic
kafka-topics.sh --create --zookeeper node-1:2181 --replication-factor 1 --partitions 3 --topic kafka_spark
(5)向topic中生产数据
经过shell命令向topic发送消息
kafka-console-producer.sh --broker-list node-1:9092--topic kafka_spark
(6)编写SparkStreaming应用程序
KafkaUtils.createDstream方式(基于kafka高级Api-----偏移量由zk保存)
package cn.testdemo.dstream.kafka
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.immutable
//todo:利用sparkStreaming对接kafka实现单词计数----采用receiver(高级API)
object SparkStreamingKafka_Receiver {
def main(args: Array[String]): Unit = {
//一、建立sparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreamingKafka_Receiver")
.setMaster("local[2]")
.set("spark.streaming.receiver.writeAheadLog.enable","true") //开启wal预写日志,保存数据源的可靠性
//二、建立sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//三、建立StreamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//设置checkpoint
ssc.checkpoint("./Kafka_Receiver")
//四、定义zk地址
val zkQuorum="node-1:2181,node-2:2181,node-3:2181"
//五、定义消费者组
val groupId="spark_receiver"
//六、定义topic相关信息 Map[String, Int]
// 这里的value并非topic分区数,它表示的topic中每个分区被N个线程消费
val topics=Map("kafka_spark" -> 2)
//七、经过KafkaUtils.createStream对接kafka
//这个时候至关于同时开启3个receiver接受数据
val receiverDstream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
}
)
//使用ssc.union方法合并全部的receiver中的数据
val unionDStream: DStream[(String, String)] = ssc.union(receiverDstream)
//八、获取topic中的数据
val topicData: DStream[String] = unionDStream.map(_._2)
//九、切分每一行,每一个单词计为1
val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
//十、相同单词出现的次数累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//十一、打印输出
result.print()
//开启计算
ssc.start()
ssc.awaitTermination()
}
}
(7)运行代码,查看控制台结果数据
总结:
经过这种方式实现,刚开始的时候系统正常运行,没有发现问题,可是若是系统异常从新启动sparkstreaming程序后,发现程序会重复处理已经处理过的数据,这种基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制能够保证数据零丢失的高可靠性,可是却没法保证数据被处理一次且仅一次,可能会处理两次。由于Spark和ZooKeeper之间多是不一样步的。官方如今也已经不推荐这种整合方式,官网相关地址下面咱们使用官网推荐的第二种方式kafkaUtils的createDirectStream()方式。
2.KafkaUtils.createDirectStream方式
不一样于Receiver接收数据,这种方式按期地从kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范围在每一个batch里面处理数据,Spark经过调用kafka简单的消费者Api读取必定范围的数据。
相比基于Receiver方式有几个优势:
A、简化并行
不须要建立多个kafka输入流,而后union它们,sparkStreaming将会建立和kafka分区一种的rdd的分区数,并且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的分区数据是一一对应的关系。
B、高效
第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会致使数据被拷贝两次,第一次是被kafka复制,另外一次是写到WAL中。而没有receiver的这种方式消除了这个问题。
C、刚好一次语义(Exactly-once-semantics)
Receiver读取kafka数据是经过kafka高层次api把偏移量写入zookeeper中,虽然这种方法能够经过数据保存在WAL中保证数据不丢失,可是可能会由于sparkStreaming和ZK中保存的偏移量不一致而致使数据被消费了屡次。EOS经过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是没法使用基于zookeeper的kafka监控工具
2.1KafkaUtils.createDirectStream实战
(1)前面的步骤跟KafkaUtils.createDstream方式同样,接下来开始执行代码。
KafkaUtils.createDirectStream方式(基于kafka低级Api-----偏移量由客户端程序保存)
package cn.itcast.dstream.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
//todo:利用sparkStreaming对接kafka实现单词计数----采用Direct(低级API)
object SparkStreamingKafka_Direct {
def main(args: Array[String]): Unit = {
//一、建立sparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreamingKafka_Direct")
.setMaster("local[2]")
//二、建立sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//三、建立StreamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//四、配置kafka相关参数
val kafkaParams=Map("metadata.broker.list"->"node-1:9092,node-2:9092,node-3:9092","group.id"->"Kafka_Direct")
//五、定义topic
val topics=Set("kafka_spark")
//六、经过 KafkaUtils.createDirectStream接受kafka数据,这里采用是kafka低级api偏移量不受zk管理
val dstream: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//七、获取kafka中topic中的数据
val topicData: DStream[String] = dstream.map(_._2)
//八、切分每一行,每一个单词计为1
val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
//九、相同单词出现的次数累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//十、打印输出
result.print()
//开启计算
ssc.start()
ssc.awaitTermination()
}
}
查看控制台的输出:
在spark1.3版本后,kafkautil里面提供了两个建立dstream的方法,
一、KafkaUtils.createDstream
构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于全部的receivers接收到的数据将会保存在Spark executors中,而后经过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上
A、建立一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故若是增长特定主体分区数仅仅是增长一个receiver中消费topic的线程数,并不增长spark的并行处理数据数量
B、对于不一样的group和topic可使用多个receivers建立不一样的DStream
C、若是启用了WAL,须要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)
2.KafkaUtils.createDirectStream
区别Receiver接收数据,这种方式按期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每一个batch里面处理数据,使用的是kafka的简单消费者api
优势:
A、 简化并行,不须要多个kafka输入流,该方法将会建立和kafka分区同样的rdd个数,并且会从kafka并行读取。
B、高效,这种方式并不须要WAL,WAL模式须要对数据复制两次,第一次是被kafka复制,另外一次是写到wal中
C、刚好一次语义(Exactly-once-semantics),传统的读取kafka数据是经过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。EOS经过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是没法使用基于zookeeper的kafka监控工具
Kafka 0.10的Spark Streaming集成在设计上相似于0.8 Direct Stream方法。它提供简单的并行性,Kafka分区和Spark分区之间的1:1对应,以及访问偏移和元数据。然而,由于较新的集成使用新的Kafka消费者API而不是简单的API,因此在使用上有显着的差别。此版本的集成被标记为实验性的,所以API可能会更改。
连接
对于使用SBT / Maven项目定义的Scala / Java应用程序,请将流应用程序与如下工件连接(有关详细信息,请参阅主编程指南中的连接部分)。
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.1.0
建立直接流
请注意,导入的命名空间包括版本org.apache.spark.streaming.kafka010
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
})
有关可能的kafkaParams,请参阅Kafka consumer配置文件。若是您的Spark批处理持续时间大于默认的Kafka心跳会话超时(30秒),请适当增长heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批次,这将须要更改代理上的group.max.session.timeout.ms。请注意,示例将enable.auto.commit设置为false,有关讨论,请参阅下面的存储偏移。
LocationStrategies
新的Kafka consumer API会将消息预取到缓冲区中。所以,出于性能缘由,Spark集成保持缓存消费者对执行者(而不是为每一个批次从新建立它们)是重要的,而且更喜欢在具备适当消费者的主机位置上调度分区。
在大多数状况下,您应该使用LocationStrategies.PreferConsistent如上所示。这将在可用的执行器之间均匀分配分区。若是您的执行程序与Kafka代理所在的主机相同,请使用PreferBrokers,这将更喜欢在该分区的Kafka leader上安排分区。最后,若是您在分区之间的负载有显着误差,请使用PreferFixed。这容许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。
消费者的缓存的默认最大大小为64.若是您但愿处理超过(64 *个执行程序数)Kafka分区,则能够经过如下方式更改此设置: spark.streaming.kafka.consumer.cache.maxCapacity
缓存由topicpartition和group.id键入,所以对每一个调用使用一个单独 group.id的createDirectStream。
ConsumerStrateges
新的Kafka consumer API有许多不一样的方式来指定主题,其中一些须要至关多的后对象实例化设置。 ConsumerStrategies提供了一种抽象,容许Spark即便在从检查点从新启动后也能得到正确配置的消费者。
ConsumerStrategies.Subscribe,如上所示,容许您订阅固定的主题集合。SubscribePattern容许您使用正则表达式来指定感兴趣的主题。注意,与0.8集成不一样,在运行流期间使用Subscribe或SubscribePattern应该响应添加分区。最后,Assign容许您指定固定的分区集合。全部三个策略都有重载的构造函数,容许您指定特定分区的起始偏移量。
若是您具备上述选项不知足的特定用户设置需求,则ConsumerStrategy是能够扩展的公共类。
建立RDD
若是您有一个更适合批处理的用例,则能够为定义的偏移量范围建立RDD。
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
注意,你不能使用PreferBrokers,由于没有流没有驱动程序端消费者为你自动查找代理元数据。若是须要,请PreferFixed使用您本身的元数据查找。
获取偏移
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
@Override
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
}
});
}
});
注意类型转换HasOffsetRanges只会成功,若是是在第一个方法中调用的结果createDirectStream,不是后来一系列的方法。请注意,RDD分区和Kafka分区之间的一对一映射在任何随机或从新分区的方法(例如reduceByKey()或window())后不会保留。
存储偏移
在失败的状况下的Kafka交付语义取决于如何和什么时候存储偏移。火花输出操做至少一次。所以,若是你想要一个彻底一次的语义的等价物,你必须在一个等幂输出以后存储偏移,或者在一个原子事务中存储偏移和输出。使用这种集成,您有3个选项,按照可靠性(和代码复杂性)的增长,如何存储偏移。
检查点
若是启用Spark 检查点,偏移将存储在检查点中。这很容易实现,但有缺点。你的输出操做必须是幂等的,由于你会获得重复的输出; 事务不是一个选项。此外,若是应用程序代码已更改,您将没法从检查点恢复。对于计划升级,您能够经过与旧代码同时运行新代码来缓解这种状况(由于输出必须是幂等的,它们不该该冲突)。但对于须要更改代码的意外故障,您将丢失数据,除非您有其余方法来识别已知的良好起始偏移。
kafka
Kafka有一个偏移提交API,将偏移存储在特殊的Kafka主题中。默认状况下,新消费者将按期自动提交偏移量。这几乎确定不是你想要的,由于消费者成功轮询的消息可能尚未致使Spark输出操做,致使未定义的语义。这就是为何上面的流示例将“enable.auto.commit”设置为false的缘由。可是,您能够在使用commitAsyncAPI 存储了输出后,向Kafka提交偏移量。与检查点相比,Kafka是一个耐用的存储,而无论您的应用程序代码的更改。然而,Kafka不是事务性的,因此你的输出必须仍然是幂等的。
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
}
});
您本身的数据存储
对于支持事务的数据存储,即便在故障状况下,也能够在同一事务中保存偏移量做为结果,以保持二者同步。若是您仔细检查重复或跳过的偏移范围,则回滚事务可防止重复或丢失的邮件影响结果。这给出了刚好一次语义的等价物。也可使用这种策略甚至对于聚合产生的输出,聚合一般很难使幂等。
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
});
SSL / TLS
新的Kafka消费者支持SSL。要启用它,请在传递到createDirectStream/ 以前适当地设置kafkaParams createRDD。注意,这只适用于Spark和Kafka代理之间的通讯; 您仍然有责任单独保证 Spark节点间通讯。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");
部署
与任何Spark应用程序同样,spark-submit用于启动应用程序。
对于Scala和Java应用程序,若是您使用SBT或Maven进行项目管理,则将程序包spark-streaming-kafka-0-10_2.11及其依赖项包含到应用程序JAR中。确保spark-core_2.11并spark-streaming_2.11标记为provided依赖关系,由于它们已经存在于Spark安装中。而后使用spark-submit启动应用程序(请参阅主程序指南中的部署部分)。
spark-streaming-kafka-0-10中已经实现offset自动提交zk中
最新的实现中createDirectStream也能够提交offset了spark-streaming-kafka-0-10http://spark.apache.org/docs/latest/streaming-kafka-integration.html但要求 kafka是0.10.0及之后。
createDirectStream不会自动提交offset到zk中,不能方便的监控数据消费状况
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) .transform(rdd => { val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (offset <- offsets) { val topicAndPartition = TopicAndPartition(offset.topic, offset.partition) //保存offset至zk可redis中方便监控 //commitOffset(kafkaParams,groupId, Map(topicAndPartition -> offset.untilOffset)) } rdd })
若是能够只是用来监控消费状况在transform中转换成HasOffsetRanges取出offset保存到zk中便可,
"rdd.asInstanceOf[HasOffsetRanges].offsetRanges" 若是已经通过其它Transformations或output操做以后此rdd已经不是KafkaRDD,再转换会报错!!
另外还有一个控制能更强的createDirectStream方法,能够指定fromOffsets和messageHandler
def createDirectStream(
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
)
能够将offset保存在zk或redis等外部存储中方便监控,而后下次启动时再从中读取
Kafka中的partition和Spark中的partition是不一样的概念,但createDirectStream方式时topic的总partition数量和Spark和partition数量相等。
```
//KafkaRDD.getPartitions
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
```
partition中数据分布不均会致使有些任务快有些任务慢,影响总体性能,能够根据实际状况作repartition,单个topic比较容易实现partition中数据分布均匀,但若是同一个程序中须要同时处理多个topic的话,能够考虑可否合并成一个topic,增长partition数量,不过topic不少时间会和其它系统共用,因此可能不容易合并,这状况只能作repartition。虽然repartition会消耗一些时间,但总的来讲,若是数据分布不是很均匀的话repartition仍是值得,repartition以后各任务处理数据量基本同样,并且Locality_level会变成“PROCESS_LOCAL”
!!使用flume加载到kafka的使用默认配置十有八九分布不匀
代码:
Object SparkApp(){ def gnStreamContext(chkdir:String,batchDuration: Duration,partitions:Int)={ val conf = new SparkConf().setAppName("GnDataToHive") //.setMaster("local[2]") val ssc = new StreamingContext(conf, batchDuration) KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) ........... ........... ........... val terminfos = ssc.sparkContext.broadcast(ttis) ssc.checkpoint(chkdir) ssc } def main(args: Array[String]): Unit = { val chkdir="hdfs://xxxxx/chkpoint/chkpoint-1" val chkssc = StreamingContext.getOrCreate(chkdir,()=>gnStreamContext(chkdir,Seconds(args(0).toInt),args(1).toInt)) chkssc.start() chkssc.awaitTermination() } }
offset会在保存至检查点中,下次启动会继续接着读取可是如下问题须要注意:
kafka中数一般保存周期都不会太长,都有清理周期,若是记录的offset对应数据已经被清理,从检查点恢复时程序会一直报错。
若是程序逻辑发生变化,须要先删除检查点,不然无论数据仍是逻辑都会从旧检查点恢复。
能够用spark.streaming.kafka.maxRatePerPartition指定每一个批次从每一个partition中每秒中最大拉取的数据量,好比将值设为1000的话,每秒钟最多从每一个partition中拉取1000条数据,若是batchDuration设为1分钟的话,则每一个批次最多从每一个partition中拉取60000条数据。
此值要设置合理,过小有可能致使资源浪费,但kafka中的数据消费不完,太多又达不到限流的目的
具体代码见:
DirectKafkaInputDStream.maxMessagesPerPartition
DirectKafkaInputDStream.clamp
``` // limits the maximum number of messages per partition protected def clamp( leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { maxMessagesPerPartition.map { mmp => leaderOffsets.map { case (tp, lo) => tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset)) } }.getOrElse(leaderOffsets) } ```
spark-submit提交时带上便可:--conf spark.streaming.kafka.maxRatePerPartition=10000
貌似只能在createDirectStream中起做用,在createStream方式中没看到有相似设置
写入hdfs时默认目录名格式为:"prefix-TIME_IN_MS.suffix",每一个目录下的文件名为"part-xxxx"。
若是只想自定义目录名能够经过foreachRDD,调用RDD的saveAsXXX dstream.foreachRDD(rdd=>rdd.saveAsxxxx(""))
若是须要自定义输出的文件名,须要自定义一个FileOutputFormat的子类,修改getRecordWriter方法中的name便可,而后调用saveAsHadoopFile[MyTextOutputFormat[NullWritable, Text]]
。
某些状况下载关联外部数据进行关联或计算。
mapPartitions
或foreachRDD.foreachPartitions
中关联By default, it will start consuming from the latest offset of each Kafka partition. If you set configuration auto.offset.reset in Kafka parameters to smallest, then it will start consuming from the smallest offset.
topic主题,分区ID,起始offset,结束offset
由于spark源码中KafkaCluster
类被限制在[spark]包下,因此咱们若是想要在项目中调用这个类,那么只能在项目中也新建包org.apache.spark.streaming.kafka
.而后再该包下面写调用的逻辑.这里面就能够引用KafkaCluster
类了.这个类里面封装了不少实用的方法,好比:获取主题和分区,获取offset等等...
这些api,spark里面都有现成的,咱们如今就是须要组织起来!
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
简单说一下
在zookeeper上读取offset前先根据实际状况更新fromOffsets
1.1 若是是一个新的groupid,那么会从最新的开始读
1.2 若是是存在的groupid,根据配置auto.offset.reset
1.2.1 smallest
: 那么会从开始读,获取最开始的offset.
1.2.2 largest
: 那么会从最新的位置开始读取,获取最新的offset.
根据topic获取topic和该topics下全部的partitions
val partitionsE = kc.getPartitions(topics)
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V, String)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => ( mmd.key, mmd.message, mmd.topic))
kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
sparkstreaming-kafka的源码中是本身把offset维护在kafka集群中了?
./kafka-consumer-groups.sh --bootstrap-server 10.10.25.13:9092 --describe --group heheda
由于用命令行工具能够查到,这个工具能够查到基于javaapi方式的offset,查不到在zookeeper中的
网上的本身维护offset,是把offset维护在zookeeper中了?
用这个方式产生的groupid,在命令行工具中查不到,可是也是调用的源码中的方法呢?
难道spark提供了这个方法,可是本身却没有用是吗?
区别只在于,本身维护offset,会先去zk中获取offset,逻辑处理完成后再更新zk中的offset.
然而,在代码层面,区别在于调用了不一样的KafkaUtils.createDirectStream
本身维护的offset,这个方法会传入offset.由于在此以前咱们已经从zk中获取到了起始offset
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V, String)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => ( mmd.key, mmd.message, mmd.topic))
接受的是一个topic,底层会根据topic去获取存储在kafka中的起始offset
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, myTopic)
接下来这个方法里面会调用getFromOffsets
来获取起始的offset
val kc = new KafkaCluster(kafkaParams) val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
这个代码,网上不少,GitHub上也有现成的了.这里我就不贴出来了!
这里主要仍是学习代码的实现思路!