JavaShuo
栏目
标签
基于Flume的美团日志收集系统(二)改进和优化
时间 2019-11-17
标签
基于
flume
团日
收集
系统
改进
优化
栏目
日志分析
繁體版
原文
原文链接
问题导读:
1.Flume的存在些什么问题?
2.基于开源的Flume美团增长了哪些功能?
3.Flume系统如何调优?
在《
基于Flume的美团日志收集系统(一)架构和设计
》中,咱们详述了基于Flume的美团日志收集系统的架构设计,以及为何作这样的设计。在本节中,咱们将会讲述在实际部署和使用过程当中遇到的问题,对Flume的功能改进和对系统作的优化。
1 Flume的问题总结
在Flume的使用过程当中,遇到的主要问题以下:
a. Channel“水土不服”:使用固定大小的MemoryChannel在日志高峰时常报队列大小不够的异常;使用FileChannel又致使IO繁忙的问题;
b. HdfsSink的性能问题:使用HdfsSink向Hdfs写日志,在高峰时间速度较慢;
c. 系统的管理问题:配置升级,模块重启等;
2 Flume的功能改进和优化点
从上面的问题中能够看到,有一些需求是原生Flume没法知足的,所以,基于开源的Flume咱们增长了许多功能,修改了一些Bug,而且进行一些调优。下面将对一些主要的方面作一些说明。
2.1 增长Zabbix monitor服务
一方面,Flume自己提供了http, ganglia的监控服务,而咱们目前主要使用zabbix作监控。所以,咱们为Flume添加了zabbix监控模块,和sa的监控服务无缝融合。
另外一方面,净化Flume的metrics。只将咱们须要的metrics发送给zabbix,避免 zabbix server形成压力。目前咱们最为关心的是Flume可否及时把应用端发送过来的日志写到Hdfs上, 对应关注的metrics为:
Source : 接收的event数和处理的event数
Channel : Channel中拥堵的event数
Sink : 已经处理的event数
2.2 为HdfsSink增长自动建立index功能
首先,咱们的HdfsSink写到hadoop的文件采用lzo压缩存储。 HdfsSink能够读取hadoop配置文件中提供的编码类列表,而后经过配置的方式获取使用何种压缩编码,咱们目前使用lzo压缩数据。采用lzo压缩而非bz2压缩,是基于如下测试数据:
event大小(Byte)
sink.batch-size
hdfs.batchSize
压缩格式
总数据大小(G)
耗时(s)
平均events/s
压缩后大小(G)
544
300
10000
bz2
9.1
2448
6833
1.36
544
300
10000
lzo
9.1
612
27333
3.49
其次,咱们的HdfsSink增长了建立lzo文件后自动建立index功能。Hadoop提供了对lzo建立索引,使得压缩文件是可切分的,这样 Hadoop Job能够并行处理数据文件。HdfsSink自己lzo压缩,但写完lzo文件并不会建索引,咱们在close文件以后添加了建索引功能。
/**
* Rename bucketPath file from .tmp to permanent location.
*/
private void renameBucket() throws IOException, InterruptedException {
if(bucketPath.equals(targetPath)) {
return;
}
final Path srcPath = new Path(bucketPath);
final Path dstPath = new Path(targetPath);
callWithTimeout(new CallRunner<Object>() {
@Override
public Object call() throws Exception {
if(fileSystem.exists(srcPath)) { // could block
LOG.info("Renaming " + srcPath + " to " + dstPath);
fileSystem.rename(srcPath, dstPath); // could block
//index the dstPath lzo file
if (codeC != null && ".lzo".equals(codeC.getDefaultExtension()) ) {
LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());
lzoIndexer.index(dstPath);
}
}
return null;
}
});
}
复制代码
2.3 增长HdfsSink的开关
咱们在HdfsSink和DualChannel中增长开关,当开关打开的状况下,HdfsSink再也不往Hdfs上写数据,而且数据只写向DualChannel中的FileChannel。以此策略来防止Hdfs的正常停机维护。
2.4 增长DualChannel
Flume自己提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久 化;FileChannel则恰好相反。咱们但愿利用二者的优点,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用 MemoryChannel,当Sink处理速度跟不上,又须要Channel可以缓存下应用端发送过来的日志时,就使用FileChannel,由此我 们开发了DualChannel,可以智能的在两个Channel之间切换。
其具体的逻辑以下:
/***
* putToMemChannel indicate put event to memChannel or fileChannel
* takeFromMemChannel indicate take event from memChannel or fileChannel
* */
private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
void doPut(Event event) {
if (switchon && putToMemChannel.get()) {
//往memChannel中写数据
memTransaction.put(event);
if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
putToMemChannel.set(false);
}
} else {
//往fileChannel中写数据
fileTransaction.put(event);
}
}
Event doTake() {
Event event = null;
if ( takeFromMemChannel.get() ) {
//从memChannel中取数据
event = memTransaction.take();
if (event == null) {
takeFromMemChannel.set(false);
}
} else {
//从fileChannel中取数据
event = fileTransaction.take();
if (event == null) {
takeFromMemChannel.set(true);
putToMemChannel.set(true);
}
}
return event;
}
复制代码
2.5 增长NullChannel
Flume提供了NullSink,能够把不须要的日志经过NullSink直接丢弃,不进行存储。然而,Source须要先将events存放到 Channel中,NullSink再将events取出扔掉。为了提高性能,咱们把这一步移到了Channel里面作,因此开发了 NullChannel。
2.6 增长KafkaSink
为支持向Storm提供实时数据流,咱们增长了KafkaSink用来向Kafka写实时数据流。其基本的逻辑以下:
public class KafkaSink extends AbstractSink implements Configurable {
private String zkConnect;
private Integer zkTimeout;
private Integer batchSize;
private Integer queueSize;
private String serializerClass;
private String producerType;
private String topicPrefix;
private Producer<String, String> producer;
public void configure(Context context) {
//读取配置,并检查配置
}
@Override
public synchronized void start() {
//初始化producer
}
@Override
public synchronized void stop() {
//关闭producer
}
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = getChannel();
Transaction tx = channel.getTransaction();
try {
tx.begin();
//将日志按category分队列存放
Map<String, List<String>> topic2EventList = new HashMap<String, List<String>>();
//从channel中取batchSize大小的日志,从header中获取category,生成topic,并存放于上述的Map中;
//将Map中的数据经过producer发送给kafka
tx.commit();
} catch (Exception e) {
tx.rollback();
throw new EventDeliveryException(e);
} finally {
tx.close();
}
return status;
}
}
复制代码
2.7 修复和scribe的兼容问题
Scribed在经过ScribeSource发送数据包给Flume时,大于4096字节的包,会先发送一个Dummy包检查服务器的反应,而 Flume的ScribeSource对于logentry.size()=0的包返回TRY_LATER,此时Scribed就认为出错,断开链接。这 样循环反复尝试,没法真正发送数据。如今在ScribeSource的Thrift接口中,对size为0的状况返回OK,保证后续正常发送数据。
3. Flume系统调优经验总结
3.1 基础参数调优经验
HdfsSink中默认的serializer会每写一行在行尾添加一个换行符,咱们日志自己带有换行符,这样会致使每条日志后面多一个空行,修改配置不要自动添加换行符;
lc.sinks.sink_hdfs.serializer.appendNewline = false
复制代码
调大MemoryChannel的capacity,尽可能利用MemoryChannel快速的处理能力;
调大HdfsSink的batchSize,增长吞吐量,减小hdfs的flush次数;
适当调大HdfsSink的callTimeout,避免没必要要的超时错误;
3.2 HdfsSink获取Filename的优化
HdfsSink的path参数指明了日志被写到Hdfs的位置,该参数中能够引用格式化的参数,将日志写到一个动态的目录中。这方便了日志的管理。例如咱们能够将日志写到category分类的目录,而且按天和按小时存放:
lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%H
复制代码
HdfsS ink中处理每条event时,都要根据配置获取此event应该写入的Hdfs path和filename,默认的获取方法是经过正则表达式替换配置中的变量,获取真实的path和filename。由于此过程是每条event都要 作的操做,耗时很长。经过咱们的测试,20万条日志,这个操做要耗时6-8s左右。
因为咱们目前的path和filename有固定的模式,能够经过字符串拼接得到。然后者比正则匹配快几十倍。拼接定符串的方式,20万条日志的操做只须要几百毫秒。
3.3 HdfsSink的b/m/s优化
在咱们初始的设计中,全部的日志都经过一个Channel和一个HdfsSink写到Hdfs上。咱们来看一看这样作有什么问题。
首先,咱们来看一下HdfsSink在发送数据的逻辑:
//从Channel中取batchSize大小的events
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
//对每条日志根据category append到相应的bucketWriter上;
bucketWriter.append(event);
}
for (BucketWriter bucketWriter : writers) {
//而后对每个bucketWriter调用相应的flush方法将数据flush到Hdfs上
bucketWriter.flush();
}
复制代码
假设咱们的系统中有100个category,batchSize大小设置为20万。则每20万条数据,就须要对100个文件进行append或者flush操做。
其次,对于咱们的日志来讲,基本符合80/20原则。即20%的category产生了系统80%的日志量。这样对大部分日志来讲,每20万条可能只包含几条日志,也须要往Hdfs上flush一次。
上述的状况会致使HdfsSink写Hdfs的效率极差。下图是单Channel的状况下每小时的发送量和写hdfs的时间趋势图。
鉴于这种实际应用场景,咱们把日志进行了大小归类,分为big, middle和small三类,这样能够有效的避免小日志跟着大日志一块儿频繁的flush,提高效果明显。下图是分队列后big队列的每小时的发送量和写hdfs的时间趋势图。
相关文章
1.
基于Flume的美团日志收集系统(二)改进和优化
2.
基于Flume的美团日志收集系统改进和优化
3.
基于Flume的美团日志收集系统(一)
4.
10044---基于Flume的美团日志收集系统(一)架构和设计
5.
基于Flume的美团日志收集系统(一)架构和设计
6.
转:基于Flume的美团日志收集系统(一)架构和设计
7.
Flume---日志收集系统
8.
日志收集系统 flume
9.
基于Flume的野狗实时日志系统的演进和优化
10.
基于Flume的日志收集系统方案参考
更多相关文章...
•
系统定义的TypeHandler
-
MyBatis教程
•
基于ARP协议进行扫描
-
TCP/IP教程
•
☆基于Java Instrument的Agent实现
•
Docker容器实战(七) - 容器眼光下的文件系统
相关标签/搜索
系统日志笔记二
我的日志
美团
优化系列
优美
集团
日志
优于
改进
和美
日志分析
XLink 和 XPointer 教程
MySQL教程
MyBatis教程
文件系统
代码格式化
0
分享到微博
分享到微信
分享到QQ
每日一句
每一个你不满意的现在,都有一个你没有努力的曾经。
最新文章
1.
「插件」Runner更新Pro版,帮助设计师远离996
2.
错误 707 Could not load file or assembly ‘Newtonsoft.Json, Version=12.0.0.0, Culture=neutral, PublicKe
3.
Jenkins 2018 报告速览,Kubernetes使用率跃升235%!
4.
TVI-Android技术篇之注解Annotation
5.
android studio启动项目
6.
Android的ADIL
7.
Android卡顿的检测及优化方法汇总(线下+线上)
8.
登录注册的业务逻辑流程梳理
9.
NDK(1)创建自己的C/C++文件
10.
小菜的系统框架界面设计-你的评估是我的决策
本站公众号
欢迎关注本站公众号,获取更多信息
相关文章
1.
基于Flume的美团日志收集系统(二)改进和优化
2.
基于Flume的美团日志收集系统改进和优化
3.
基于Flume的美团日志收集系统(一)
4.
10044---基于Flume的美团日志收集系统(一)架构和设计
5.
基于Flume的美团日志收集系统(一)架构和设计
6.
转:基于Flume的美团日志收集系统(一)架构和设计
7.
Flume---日志收集系统
8.
日志收集系统 flume
9.
基于Flume的野狗实时日志系统的演进和优化
10.
基于Flume的日志收集系统方案参考
>>更多相关文章<<