最近一直在用directstream方式消费kafka中的数据,特此总结,整个代码工程分为三个部分
一. 完整工程代码以下(某些地方特地作了说明, 这个代码的部分函数直接用的是spark-streaming-kafka-0.8_2.11)java
package directStream import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder import kafka.common.TopicAndPartition import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.TopicPartition //import java.util._ import org.apache.spark.{SparkContext,SparkConf,TaskContext, SparkException} import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.kafka.{KafkaUtils,HasOffsetRanges, OffsetRange,KafkaCluster} import com.typesafe.config.ConfigFactory import scalikejdbc._ import scala.collection.JavaConverters._ object SetupJdbc { def apply(driver: String, host: String, user: String, password: String): Unit = { Class.forName(driver) ConnectionPool.singleton(host, user, password) } } object SimpleApp{ def main(args: Array[String]): Unit = { val conf = ConfigFactory.load // 加载工程resources目录下application.conf文件,该文件中配置了databases信息,以及topic及group消息 val kafkaParams = Map[String, String]( "metadata.broker.list" -> conf.getString("kafka.brokers"), "group.id" -> conf.getString("kafka.group"), "auto.offset.reset" -> "smallest" ) val jdbcDriver = conf.getString("jdbc.driver") val jdbcUrl = conf.getString("jdbc.url") val jdbcUser = conf.getString("jdbc.user") val jdbcPassword = conf.getString("jdbc.password") val topic = conf.getString("kafka.topics") val group = conf.getString("kafka.group") val ssc = setupSsc(kafkaParams, jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword,topic, group)() ssc.start() ssc.awaitTermination() } def createStream(taskOffsetInfo: Map[TopicAndPartition, Long], kafkaParams: Map[String, String], conf:SparkConf, ssc: StreamingContext, topics:String):InputDStream[_] = { // 若taskOffsetInfo 不为空, 说明这不是第一次启动该任务, database已经保存了该topic下该group的已消费的offset, 则对比kafka中该topic有效的offset的最小值和数据库保存的offset,去比较大做为新的offset. if(taskOffsetInfo.size != 0){ val kc = new KafkaCluster(kafkaParams) val earliestLeaderOffsets = kc.getEarliestLeaderOffsets(taskOffsetInfo.keySet) if(earliestLeaderOffsets.isLeft) throw new SparkException("get kafka partition failed:") val earliestOffSets = earliestLeaderOffsets.right.get val offsets = earliestOffSets.map(r => new TopicAndPartition(r._1.topic, r._1.partition) -> r._2.offset.toLong) val newOffsets = taskOffsetInfo.map(r => { val t = offsets(r._1) if (t > r._2) { r._1 -> t } else { r._1 -> r._2 } } ) val messageHandler = (mmd: MessageAndMetadata[String, String]) => 1L KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Long](ssc, kafkaParams, newOffsets, messageHandler) //val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( } else { val topicSet = topics.split(",").toSet KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams,topicSet) } } def setupSsc( kafkaParams: Map[String, String], jdbcDriver: String, jdbcUrl: String, jdbcUser: String, jdbcPassword: String, topics:String, group:String )(): StreamingContext = { val conf = new SparkConf() .setMaster("mesos://10.142.113.239:5050") .setAppName("offset") .set("spark.worker.timeout", "500") .set("spark.cores.max", "10") .set("spark.streaming.kafka.maxRatePerPartition", "500") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.streaming.backpressure.enabled", "true") .set("spark.task.maxFailures", "1") .set("spark.speculationfalse", "false") val ssc = new StreamingContext(conf, Seconds(5)) SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) // connect to mysql // begin from the the offsets committed to the database val fromOffsets = DB.readOnly { implicit session => sql"select topic, part, offset from streaming_task where group_id=$group". map { resultSet => new TopicAndPartition(resultSet.string(1), resultSet.int(2)) -> resultSet.long(3) }.list.apply().toMap } val stream = createStream(fromOffsets, kafkaParams, conf, ssc, topics) stream.foreachRDD { rdd => if(rdd.count != 0){ // you task val t = rdd.map(record => (record, 1)) val results = t.reduceByKey {_+_}.collect // persist the offset into the database val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges DB.localTx { implicit session => offsetRanges.foreach { osr => sql"""replace into streaming_task values(${osr.topic}, ${group}, ${osr.partition}, ${osr.untilOffset})""".update.apply() if(osr.partition == 0){ println(osr.partition, osr.untilOffset) } } } } } ssc } }
二. 工程的resources文件下的有个application.conf配置文件,其配置以下mysql
jdbc { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/xxxx" user = "xxxx" password = "xxxx" } kafka { topics = "xxxx" brokers = "xxxx.xxx.xxx.:xxx,xxx.xxx.xxx.xxx:9092,xxx.xxxx.xxx:xxxx" group = "xxxxxx" } jheckpointDir = "hdfs://xxx.xxx.xxx.xxx:9000/shouzhucheckpoint" batchDurationMs = xxxx
三. 配置文件中能够看到, 我把offset 保存在 mysql里,这里我定义了一个table 名称为streaming_task, 表的结构信息以下:sql
+----------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +----------+--------------+------+-----+---------+-------+ | topic | varchar(100) | NO | PRI | NULL | | | group_id | varchar(50) | NO | PRI | | | | part | int(4) | NO | PRI | 0 | | | offset | mediumtext | YES | | NULL | | +----------+--------------+------+-----+---------+-------+
一. 选用direct 的缘由
官方为spark提供了两种方式来消费kafka中的数据, 高阶api由kafka本身来来维护offset, 有篇blog总结的比较好数据库
第一种是利用 Kafka 消费者高级 API 在 Spark 的工做节点上建立消费者线程,订阅 Kafka 中的消息,数据会传输到 Spark 工做节点的执行器中,可是默认配置下这种方法在 Spark Job 出错时会致使数据丢失,若是要保证数据可靠性,须要在 Spark Streaming 中开启Write Ahead Logs(WAL),也就是上文提到的 Kafka 用来保证数据可靠性和一致性的数据保存方式。能够选择让 Spark 程序把 WAL 保存在分布式文件系统(好比 HDFS)中,apache
第二种方式不须要创建消费者线程,使用 createDirectStream 接口直接去读取 Kafka 的 WAL,将 Kafka 分区与 RDD 分区作一对一映射,相较于第一种方法,不需再维护一份 WAL 数据,提升了性能。读取数据的偏移量由 Spark Streaming 程序经过检查点机制自身处理,避免在程序出错的状况下重现第一种方法重复读取数据的状况,消除了 Spark Streaming 与 ZooKeeper/Kafka 数据不一致的风险。保证每条消息只会被 Spark Streaming 处理一次。如下代码片经过第二种方式读取 Kafka 中的数据:api
在我在使用第一种方式的时候,若是数据量太大, 每每会出现报错,了解这这两种方式的不一样后, 果断选用了第二种,session
二. 引入KafkaCluster类的缘由app
引入KafkaCluster是为了在整个任务启动以前, 首先获取topic的有效的最旧offset. 这跟kafka的在实际的使用场景,大公司都是按时间删除kafka上数据有关,若是任务挂的时间过久,在还未能启动任务以前,database中保存的offset已经在kafak中失效,这时候为了最大程度的减小损失,只能从该topic的最旧数据开始消费..分布式
三. 存入database的缘由函数
看上面的代码,估计好多人也扒过KafkaCluster的源码, 这个类里面其实有一个setConsumerOffsets这样的方法�, 其实在处理过一个batch的数据后, 更新一下该topic下group的offset便可,可是仍是在开始启动这个 job 的时候还得验证该offset否有效. 貌似这样还不用外部数据库,岂不方便? 其实这样作确实挺方便,
有些场景下这样作无可厚非, 但我以为: 若是处理完数据,要写到外部数据库, 此时,若是能把写数据和写offset放在一个事务中(前提是这个数据库是支持事务), 那么就能够便可保证严格消费一次
四. conf 中两个特殊设置设置
为了确保task不会重复执行请设置下面两个参数: