日志服务做为一站式的日志的采集与分析平台,提供了各类用户场景的日志采集能力,经过日志服务提供的各类与·与SDK,采集客户端(Logtail),Producer,用户能够很是容易的把各类数据源中的数据采集到日志服务的Logstore中。同时为了便于用户对日志进行处理,提供了各类支持流式消费的SDK,如各类语言的消费组,与 Spark,Flink,Storm 等各类流计算技术无缝对接的Connector,以便于用户根据本身的业务场景很是便捷的处理海量日志。html
从最先的Spark Streaming到最新的Stuctured Streaming,Spark 一直是最流行的流计算框架之一。使用日志服务的Spark SDK,能够很是方便的在Spark 中消费日志服务中的数据,同时也支持将 Spark 的计算结果写入日志服务。git
日志服务的存储层是一个相似Kafka的Append only的FIFO消息队列,包含以下基本概念:github
1)添加Maven依赖:sql
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-logservice_2.11</artifactId> <version>1.9.0</version> </dependency>
Github源码下载。
2)计划消费的日志服务project,logstore以及对应的endpoint。
3)用于访问日志服务Open API的Access Key。apache
Spark Streaming是Spark最先推出的流计算技术,如今已经进入维护状态,再也不会增长新的功能。可是考虑到Spark Streaming 的使用仍然很是普遍,咱们先从Spark Streaming开始介绍。Spark Streaming 提供了一个DStream 的数据模型抽象,本质是把无界数据集拆分红一个一个的RDD,转化为有界数据集的流式计算。每一个批次处理的数据就是这段时间内从日志服务消费到的数据。并发
Spark Streaming 从日志服务消费支持 Receiver 和 Direct 两种消费方式。app
Receivers的实现内部实现基于日志服务的消费组(Consumer Library)。数据拉取与处理彻底分离。消费组自动均匀分配Logstore内的全部shard到全部的Receiver,而且自动提交checkpoint到SLS。这就意味着Logstore内的shard个数与Spark 实际的并发没有对应关系。
对于全部的Receiver,接收到的数据默认会保存在Spark Executors中,因此Failover的时候有可能形成数据丢失,这个时候就须要开启WAL日志,Failover的时候能够从WAL中恢复,防止丢失数据。框架
SDK将SLS中的每行日志解析为JSON字符串形式,Receiver使用示例以下所示:ide
object SLSReceiverSample { def main(args: Array[String]): Unit = { val project = "your project" val logstore = "your logstore" val consumerGroup = "consumer group" val endpoint = "your endpoint" val accessKeyId = "access key id" val accessKeySecret = "access key secret" val batchInterval = Milliseconds(5 * 1000) val conf = new SparkConf().setAppName("Test SLS Loghub") val ssc = new StreamingContext(conf, batchInterval) val stream = LoghubUtils.createStream( ssc, project, logstore, consumerGroup, endpoint, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK, LogHubCursorPosition.END_CURSOR) stream.checkpoint(batchInterval * 2).foreachRDD(rdd => rdd.map(bytes => new String(bytes)).top(10).foreach(println) ) ssc.checkpoint("hdfs:///tmp/spark/streaming") ssc.start() ssc.awaitTermination() } }
除Project,Logstore,Access Key 这些基础配置外,还能够指定StorageLevel,消费开始位置等。函数
Direct模式再也不须要Receiver,也不依赖于消费组,而是使用日志服务的低级API,在每一个批次内直接从服务端拉取数据处理。对于Logstore中的每一个Shard来讲,每一个批次都会读取指定位置范围内的数据。为了保证一致性,只有在每一个批次确认正常结束以后才能把每一个Shard的消费结束位置(checkpoint)保存到服务端。
为了实现Direct模式,SDK依赖一个本地的ZooKeeper,每一个shard的checkpoint会临时保存到本地的ZooKeeper,等用户手动提交checkpoint时,再从ZooKeeper中同步到服务端。Failover时也是先从本地ZooKeeper中尝试读上一次的checkpoint,若是没有读到再从服务端获取。
object SLSDirectSample { def main(args: Array[String]): Unit = { val project = "your project" val logstore = "your logstore" val consumerGroup = "consumerGroup" val endpoint = "endpoint" val accessKeyId = "access key id" val accessKeySecret = "access key secret" val batchInterval = Milliseconds(5 * 1000) val zkAddress = "localhost:2181" val conf = new SparkConf().setAppName("Test Direct SLS Loghub") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkAddress) val loghubStream = LoghubUtils.createDirectStream( ssc, project, logstore, consumerGroup, accessKeyId, accessKeySecret, endpoint, zkParas, LogHubCursorPosition.END_CURSOR) loghubStream.checkpoint(batchInterval).foreachRDD(rdd => { println(s"count by key: ${rdd.map(s => { s.sorted (s.length, s) }).countByKey().size}") // 手动更新checkpoint loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc.start() ssc.awaitTermination() } }
Direct模式示例
在Receiver中,若是须要限制消费速度,咱们只须要调整 Consumer Library 自己的参数便可。而Direct方式是在每一个批次开始时从SLS拉取数据,这就涉及到一个问题:一个批次内拉取多少数据才合适。若是太多,一个批次内处理不完,形成处理延时。若是太少会导worker空闲,工做不饱和,消费延时。这个时候咱们就须要合理配置拉取的速度和行数,实现一个批次尽量多处理又能及时完成的目标。理想状态下Spark 消费的总体速率应该与SLS采集速率一致,才能实现真正的实时处理。
因为SLS的数据模型是以LogGroup做为读写的基本单位,而一个LogGroup中可能包含上万行日志,这就意味着Spark中直接限制每一个批次的行数难以实现。所以,Direct限流涉及到两个配置参数:
参数 | 说明 | 默认值 |
---|---|---|
spark.streaming.loghub.maxRatePerShard | 每一个批次每一个Shard读取行数,决定了限流的下限 | 10000 |
spark.loghub.batchGet.step | 每次请求读取LogGroup个数,决定了限流的粒度 | 100 |
能够经过适当缩小spark.loghub.batchGet.step来控制限流的精度,可是即使如此,在某些状况下仍是会存在较大偏差,如一个LogGroup中存在10000行日志,spark.streaming.loghub.maxRatePerShard设置为100,spark.loghub.batchGet.step设置为1,那一个批次内该shard仍是会拉取10000行日志。
和Receiver相比,Direct有以下的优点:
可是也存在一些缺点:
与消费SLS相反,Spark Streaming的处理结果也能够直接写入SLS。使用示例:
... val lines = loghubStream.map(x => x) // 转换函数把结果中每条记录转为一行日志 def transformFunc(x: String): LogItem = { val r = new LogItem() r.PushBack("key", x) r } val callback = new Callback with Serializable { override def onCompletion(result: Result): Unit = { println(s"Send result ${result.isSuccessful}") } } // SLS producer config val producerConfig = Map( "sls.project" -> loghubProject, "sls.logstore" -> targetLogstore, "access.key.id" -> accessKeyId, "access.key.secret" -> accessKeySecret, "sls.endpoint" -> endpoint, "sls.ioThreadCount" -> "2" ) lines.writeToLoghub( producerConfig, "topic", "streaming", transformFunc, Option.apply(callback)) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc.start() ssc.awaitTermination()
Structured Streaming 并非最近才出现的技术,而是早在16年就已经出现,可是直到 Spark 2.2.0 才正式推出。其数据模型是基于无界表的概念,流数据至关于往一个表上不断追加行。
与Spark Streaming相比,Structured Streaming主要有以下特色:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{StringType, StructField, StructType} object StructuredStreamingDemo { def main(args: Array[String]) { val spark = SparkSession .builder .appName("StructuredLoghubWordCount") .master("local") .getOrCreate() import spark.implicits._ val schema = new StructType( Array(StructField("content", StringType))) val lines = spark .readStream .format("loghub") .schema(schema) .option("sls.project", "your project") .option("sls.store", "your logstore") .option("access.key.id", "your access key id") .option("access.key.secret", "your access key secret") .option("endpoint", "your endpoint") .option("startingoffsets", "latest") .load() .select("content") .as[String] val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("loghub") .option("sls.project", "sink project") .option("sls.store", "sink logstore") .option("access.key.id", "your access key id") .option("access.key.secret", "your access key secret") .option("endpoint", "your endpoint") .option("checkpointLocation", "your checkpoint dir") .start() query.awaitTermination() } }
代码解释:
1)schema 声明了咱们须要的字段,除了日志中的字段外,还有以下的内部字段:
__logProject__ __logStore__ __shard__ __time__ __topic__ __source__ __sequence_number__ // 每行日志惟一id
若是没有指定schema,SDK默认提供一个__value__字段,其内容为由全部字段组成的一个JSON字符串。
2)lines 定义了一个流。
startingoffsets:开始位置,支持:
maxOffsetsPerTrigger:批次读取行数,SDK中默认是64*1024 。
3)结果写入到日志服务
format 指定为Loghub便可。
官方文档:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
SLS SDK例子:https://github.com/aliyun/aliyun-emapreduce-sdk/tree/master-2.x/examples/src/main/scala/com/aliyun/emr/examples/sql/streaming
日志服务实时消费:https://help.aliyun.com/document_detail/28998.html
本文做者:liketic
本文为阿里云内容,未经容许不得转载。