Apache Spark 是加州大学伯克利分校的 AMPLabs 开发的开源分布式轻量级通用计算框架。ios
因为 Spark 基于内存设计,使得它拥有比 Hadoop 更高的性能(极端状况下能够达到 100x),而且对多语言(Scala、Java、Python)提供支持。apache
其一栈式设计特色使得咱们的学习和维护成本大大地减小,并且其提供了很好的容错解决方案bootstrap
业务场景框架
咱们天天都有来自全国各地的自然气购气数据,并根据用户的充气,退气,核销等实时计算分析的是用户订单数数据,因为数据量比较大,单台机器处理已经达到了瓶颈;综合业务场景分析,咱们选用 Spark Streaming + Kafka+Flume+Hbase+kudu 来处理这些日志;又由于业务系统不统一,先经过Spark Streaming对数据进行清洗后再回写kafka集群,由于会有其余业务也须要kafka的数据;经过经过不一样的程序对kafka数据进行消费,用户记录以多版本方式记录到hbase;须要常常统计的指标业务数据写入kududom
业务代码:分布式
建立DStreamide
val sparkConf = new SparkConf().setAppName("OrderSpark") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerAddress,"group.id" -> groupId) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, Set(topic))
返回的messages 是一个 DStream,它是对 RDD 的封装,其上的不少操做都相似于 RDD;函数
createDirectStream 函数是 Spark 1.3.0 开始引入的,其内部实现是调用 Kafka 的低层次 API,Spark 自己维护 Kafka 偏移量等信息,因此能够保证数据零丢失oop
可是机器一旦宕机或者重启时,可能会存在重复消费;所以咱们能够经过本身对offset进行checkpoint性能
获取kafkaoffset
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) var offsetRanges = Array[OffsetRange]() kafkaStream.transform{ rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD(rdd=>{ for(o <- offsetRanges) { println(s"@@@@@@ topic ${o.topic} partition ${o.partition} fromoffset ${o.fromOffset} untiloffset ${o.untilOffset} #######") }
}
为了可以在 Spark Streaming 程序挂掉后又能从断点处恢复,咱们每一个批次进行向zookeeper进行 Checkpoint;
这里咱们没有采用spark自带的checkpoint,是由于一旦程序修改,以前序列化的checkpoint数据会冲突报错,
固然checkpoint到文件也会随之越大。(读者能够本身搜索spark 文件checkpoint的弊端)
启动实时程序
ssc.start()
ssc.awaitTermination()
因业务所需须要向kafka回写数据
rdd.foreachPartition(partition=>{ val props = new Properties() props.put("bootstrap.servers",Constans.brokers) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String,String](props) partition.foreach(r=>{ val record = new ProducerRecord[String, String](Constans.topic_kc, new Random().nextInt(3), "", msg)
producer.send(record,new Callback() {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (null != e) {
println("发送消息失败=>"+msg)
}
}
})
}) producer.close() })
监控
系统部署上线以后,咱们没法保证系统 7x24 小时都正常运行,即便是在运行着,咱们也没法保证 Job 不堆积、是否及时处理 Kafka 中的数据;并且 Spark Streaming 系统自己就不很稳定。因此咱们须要实时地监控系统,包括监控Kafka 集群、Spark Streaming 程序。咱们全部的监控都是CDH自带监控管理和Ganglia以及nagios,一旦检测到异常,系统会本身先重试是否能够本身恢复,若是不行,就会给咱们发送报警邮件和打电话。