这篇文章记录我使用 Spark Streaming 进行 ETL 处理的总结,主要包含如何编程,以及遇到的问题。html
我在公司使用的环境以下:java
这两个版本算是比较新的。sql
从 Kafka 中读取数据,用 SQL 处理,写入 Kafka 中。 程序主要分为 3大块:apache
最开始使用spark-streaming-kafka-0-10_2.11
。虽然这个包是实验阶段,可是考虑到用起来比较方便,就使用了这个包。整个代码的框架和官方文档的同样。编程
stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }
编程很快,可是后面遇到了不少问题:app
异常错误:WARN TaskSetManager: Lost task 9.0 in stage 1683.0 (TID 9460, 10.62.34.25, executor 9): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-2017-10-20-1100-streaming-test 1231231 1 13733588428 after polling for 1000。框架
这个错误是 DirectKafkaStream 在 poll 数据的时候,发现没有数据返回, 代码以下:ide
```scala // 从 buffer 获取数据,若是buffer 中没有数据,就 poll 数据。 if (!buffer.hasNext()) { poll(timeout) } assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") var record = buffer.next() ... ```
上面的代码的意思是从 kafka 中 poll 数据,若是 timeout 长时间后尚未获得数据,就报错。 而实际咱们的 Kafka 数据每秒钟有几千条。 而且 timeout 默认是 1秒,不可能拿不到数据。最后发现 spark-streaming-kafka-0-10_2.11
这个包对应的 kafka-clients
是 0.10.0.1。而这个版本的 kafka-clients
是有 BUG的,因而将 kafka-clients
的版本升级到 0.10.2.1。问题解决了。函数
测试的时候,发如今中止掉程序后,在重开程序,重复消费一部分数据。 那么这个问题就是,程序中止的时候没有正确的提交当前消费的 offset。
咱们的程序是经过 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
来提交每一个 RDD 的 offset 的。而这段代码的背后是将 offsetRanges 保存到了一个队列中。 等到下次从 kafka 中获取下一个 batch 的数据后(经过 compute 函数),顺便将队列中的 offset 提交到 KafkaCluster 中。
代码以下:测试
// 保存到 queue 中 def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = { commitCallback.set(callback) commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*)) } // 提交 offset , 将 queue 中的 offset 保存到 map 中,并提交 protected def commitAll(): Unit = { val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]() var osr = commitQueue.poll() while (null != osr) { val tp = osr.topicPartition val x = m.get(tp) val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) } m.put(tp, new OffsetAndMetadata(offset)) osr = commitQueue.poll() } if (!m.isEmpty) { consumer.commitAsync(m, commitCallback.get) } } // 每次从 kafka 中获取数据, 顺便提交 上一次的 offset override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { // 获取当前的 offset, 若是程序保存了offset就用程序的,若是没有,就从kafka中读取。 // 当程序重启后,就会从kafka中读取。 val untilOffsets = clamp(latestOffsets()) val offsetRanges = untilOffsets.map { case (tp, uo) => val fo = currentOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo) } ... // 获取到了数据,并保存在 rdd 中 val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, useConsumerCache) .... // 更新 offset currentOffsets = untilOffsets // 重点:提交 queue 中的offset commitAll() Some(rdd) }
看完这个逻辑,傻眼了。这样子程序结束,处理完最后一个 batch, 它的 offset 是没有办法提交到 cluster 的,结果就是重复消费。若是要本身写提交 offset 的代码,那和老版本的就没有区别了。
考虑了半天,最终仍是用老的包来实现了。
使用老的包,咱们的逻辑以下:
代码实现以下:
... 各类参数初始化 val kafkaCluster = new KafkaCluster(kafkaClusterParams) val topicAndPartitionSet = kafkaCluster.getPartitions(consumerTopics.toSet).right.get var consumerOffsetsLong = new mutable.HashMap[TopicAndPartition, Long]() if (kafkaCluster.getConsumerOffsets(kafkaClusterParams.get("group.id").toString, topicAndPartitionSet).isLeft) { val latestOffset = kafkaCluster.getLatestLeaderOffsets(topicAndPartitionSet) topicAndPartitionSet.foreach(tp => { consumerOffsetsLong.put(tp, latestOffset.right.get(tp).offset) }) } else { val consumerOffsetsTemp = kafkaCluster.getConsumerOffsets(kafkaClusterParams.get("group.id").toString, topicAndPartitionSet) topicAndPartitionSet.foreach(tp => { consumerOffsetsLong.put(tp, consumerOffsetsTemp.right.get(tp)) }) } val kafkaClusterParamsBroadcast = ssc.sparkContext.broadcast(kafkaClusterParams) val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ssc, kafkaClusterParams, consumerOffsetsLong.toMap, (mmd: MessageAndMetadata[String, String]) => mmd.message() ) stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 处理业务逻辑 val m = new mutable.HashMap[TopicAndPartition, Long]() if (null != offsetRanges) { offsetRanges.foreach( osr => { val tp = osr.topicAndPartition m.put(tp, osr.untilOffset) } ) } kafkaCluster.setConsumerOffsets(kafkaClusterParamsBroadcast.value.get("group.id").toString, m.toMap) }
这样子来处理数据,一切正常。
SQL ETL 就是使用 Spark SQL 进行处理。若是要对多个同一个 batch 进行屡次处理,最好是 将 bacth cache 起来。
这个就是从网上找的了:
import java.util.concurrent.Future import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata } class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) def close(): Unit = { producer.close() } } object KafkaSink { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, AnyRef]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) }
使用方式:
// 广播KafkaSink val kafkaSinkBroadcast: Broadcast[KafkaSink[String, String]] = { ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaSinkParams)) } val kafkaProducerTopicBroadcast = ssc.sparkContext.broadcast(producerTopic) stream.foreachRDD { .... kafkaSinkBroadcast.value.send(kafkaProducerTopicBroadcast.value, str) }
总体上的代码就是这么多。
除了代码,Spark Streaming 仍是须要某些配置的,具体以下:
kill -15 driver_pid
就能够中止掉程序。"spark.streaming.backpressure.enabled":"true", "spark.streaming.backpressure.initialRate":"1000000", "spark.streaming.kafka.maxRatePerPartition":"20000",
这三个参数用来限制消费 kafka 的速度。避免一次消费太多的数据,将程序搞垮掉。