Flume 在有赞的大数据业务中一直扮演着一个稳定可靠的日志数据“搬运工” 的角色。本文主要讲一下有赞大数据部门在 Flume 的应用实践,同时也穿插着咱们对 Flume 的一些理解。mysql
认识 Flume 对事件投递的可靠性保证是很是重要的,它每每是咱们是否使用 Flume 来解决问题的决定因素之一。web
消息投递的可靠保证有三种:sql
基本上全部工具的使用用户都但愿工具框架能保证消息 Exactly-once ,这样就没必要在设计实现上考虑消息的丢失或者重复的处理场景。可是事实上不多有工具和框架能作到这一点,真正能作到这一点所付出的成本每每很大,或者带来的额外影响反而让你以为不值得。假设 Flume 真的作到了 Exactly-once ,那势必下降了稳定性和吞吐量,因此 Flume 选择的策略是 At-least-once 。json
固然这里的 At-least-once 须要加上引号,并非说用上 Flume 的随便哪一个组件组成一个实例,运行过程当中就能保存消息不会丢失。事实上 At-least-once 原则只是说的是 Source
、 Channel
和 Sink
三者之间上下投递消息的保证。而当你选择 MemoryChannel
时,实例若是异常挂了再重启,在 channel 中的未被 sink 所消费的残留数据也就丢失了,从而没办法保证整条链路的 At-least-once。网络
Flume 的 At-least-once 保证的实现基础是创建了自身的 Transaction
机制。Flume 的 Transaction
有4个生命周期函数,分别是 start
、 commit
、rollback
和 close
。 当 Source
往 Channel
批量投递事件时首先调用 start
开启事务,批量 put 完事件后经过 commit 来提交事务,若是 commit
异常则 rollback
,而后 close
事务,最后 Source
将刚才提交的一批消息事件向源服务 ack(好比 kafka 提交新的 offset )。Sink
消费 Channel
也是相同的模式,惟一的区别就是 Sink 须要在向目标源完成写入以后才对事务进行 commit
。两个组件的相同作法都是只有向下游成功投递了消息才会向上游 ack
,从而保证了数据能 At-least-once 向下投递。架构
基于 mysql binlog
的数仓增量同步(datay
业务)是大数据这边使用 Flume 中一个比较经典的应用场景,datay 具体业务不详细说明,须要强调的是它对Flume的一个要求是必须保证在 nsq
(消息队列)的 binlog
消息能可靠的落地到 hdfs
,不容许一条消息的丢失,须要绝对的 At-least-once。框架
Flume 模型自己是基于 At-least-once 原则来传递事件,因此须要须要考虑是在各类异常状况(好比进程异常挂了)下的 At-least-once 保证。显然 MemoryChannel
没法知足,因此咱们用 FlieChannel
作代替。因为公司目前是使用 nsq 做为 binlog 的消息中转服务,故咱们没有办法使用现有的 KafkaSource
,因此基于公司的 nsq sdk 扩展了 NsqSource
。这里须要说明的是为了保证 At-least-once,Source
源必须支持消息接收的 ack 机制,好比 kafka 客户端只有认为消费了消息后,才对 offset 进行提交,否则就须要接受重复的消息。分布式
因而咱们第一个版本上线了,看上去颇有保障了,即便进程异常挂了重启也不会丢数据。ide
可能有同窗想到一个关键性的问题:若是某一天磁盘坏了而进程异常退出,而 FileChannel
恰好又有未被消费的事件数据,这个时候不就丢数据了吗?虽然磁盘坏了是一个极低的几率,但这确实是一个须要考虑的问题。函数
在 Flume 现有组件中比 FlieChannel
更可靠的,可能想到的是 KafkaChannel
,kafka 能够对消息保留多个副本,从而加强了数据的可靠性。可是咱们第二版本的方案没有选择它,而是直接扩展出 NsqChannel 。因而第二个版本就有了。
Source
+
Channel
+
Sink
三个组件的固有模式,事实上咱们不必定要三个组件都使用上。另外直接
NsqChannel
到
HDFSEventSink
的有几个好处:
Flume 在各个组件的扩展性支持具备很是好的设计考虑。
当没法知足咱们的自定义需求,咱们能够选择合适的组件上进行扩展。下面就讲讲咱们扩展的一些内容。
NsqSource
。在 Flume 定制化一个 Source
比较简单,继承一个已有通用实现的抽象类,实现相对几个生命周期方法便可。这里说明注意的是 Flume 组件的生命周期在可能被会调用屡次,好比 Flume 具备自动发现实例配置发生变化并 restart
各个组件,这种状况须要考虑资源的正确释放。
HdfsEventSink
扩展配置。它自己就具备 role file 功能,好比当 Sink 是按小时生成文件,有这一个小时的第一个事件建立新的文件,而后通过固定的 role 配置时间(好比一小时)关闭文件。这里存在的问题就是若是源平时的数据量不大,好比8点这个小时的第一个事件是在8点25分来临,那就是说须要9点25才能关闭这个文件。因为没有关闭的tmp文件会被离线数据任务的计算引擎所忽略,在小时级的数据离线任务就没办法获得实时的数据。而咱们作的改造就是 roll file 基于整点时间,而不是第一次事件的时间,好比固定的05分关闭上一次小时的文件,而离线任务调度时间设置在每小时的05分以后就能解决这个问题。最终的效果给下图:
MetricsReportServer
。当咱们须要收集 Flume 实例运行时的各个组件 counter metric ,就须要开启 MonitorService
服务。自定义了一个按期发生 http 请求汇报 metric 到一个集中的 web 服务。原生的 HTTPMetricsServer
也是基于 http 服务,区别在于它将 Flume 做为 http 服务端,而咱们将不少实例部署在一台机器上,端口分配成了比较头疼的问题。
当咱们收集到如下的 counter metric 时,就能够利用它来实现一些监控报警。
{
"identity":"olap_offline_daily_olap_druid_test_timezone_4@49",
"startTime":1544287799839,
"reportCount":4933,
"metrics":{
"SINK.olap_offline_daily_olap_druid_test_timezone_4_snk":{
"ConnectionCreatedCount":"9",
"ConnectionClosedCount":"8",
"Type":"SINK",
"BatchCompleteCount":"6335",
"BatchEmptyCount":"2",
"EventDrainAttemptCount":"686278",
"StartTime":"1544287799837",
"EventDrainSuccessCount":"686267",
"BatchUnderflowCount":"5269",
"StopTime":"0",
"ConnectionFailedCount":"48460"
},
"SOURCE.olap_offline_daily_olap_druid_test_timezone_4_src":{
"KafkaEventGetTimer":"26344146",
"AppendBatchAcceptedCount":"0",
"EventAcceptedCount":"686278",
"AppendReceivedCount":"0",
"StartTime":"1544287800219",
"AppendBatchReceivedCount":"0",
"KafkaCommitTimer":"14295",
"EventReceivedCount":"15882278",
"Type":"SOURCE",
"OpenConnectionCount":"0",
"AppendAcceptedCount":"0",
"KafkaEmptyCount":"0",
"StopTime":"0"
},
"CHANNEL.olap_offline_daily_olap_druid_test_timezone_4_cha":{
"ChannelCapacity":"10000",
"ChannelFillPercentage":"0.11",
"Type":"CHANNEL",
"ChannelSize":"11",
"EventTakeSuccessCount":"686267",
"StartTime":"1544287799332",
"EventTakeAttemptCount":"715780",
"EventPutAttemptCount":"15882278",
"EventPutSuccessCount":"686278",
"StopTime":"0"
}
}
}
复制代码
HdfsEventSink
的时候不能使用系统时间来计算文件目录,而是应该基于消息内容中的某个时间戳字段。这个能够经过扩展 Interceptor
来解决。 Interceptor
用于在 Source
投递事件给 Channel
前的一个拦截处理,通常都是用来对事件丰富 header
信息。强烈不建议在 Source
中直接处理,实现一个 Interceptor
能够知足其余 Source
相似需求的复用性。Flume 实例进行性能调优最多见的配置是事务 batch
和 Channel Capacity
。
Source
对 Channel
进行 put 或者 Sink
对 Channel
进行 take 都是经过开启事务来操做,因此调大两个组件的 batch 配置能够下降 cpu 消耗,减小网络 IO 等待等。Channel
的 capacity 大小直接影响着 source 和 sink 两端的事件生产和消费。capacity 越大,吞吐量越好,可是其余因素制约着不能设置的很大。好比 MemoryChannel
,直接表现着对内存的消耗,以及进程异常退出所丢失的事件数量。不一样的 Channel
须要不一样的考虑,最终 trade-off 是不免的。Flume 是一个很是稳定的服务,这一点在咱们生产环境中获得充分验证。 同时它的模型设计也很是清晰易懂,每一种组件类型都有不少现成的实现,同时特考虑到各个扩展点,因此咱们很容易找到或者定制化咱们所须要的数据管道的解决方案。
随着用户愈来愈多,须要有一个统一的平台来集中管理全部的 Flume 实例。 有如下几点好处:
固然这一步咱们也才刚启动,但愿它将来的价值变得愈来愈大。