引言
带宽不够用,靠这个方法我让数据压缩率达到了80%以上apache
如何在有限的资源下解决性能瓶颈问题是运维永恒的痛点。这期文章,Mr.Tech 邀请了在性能优化方面有着丰富经验的个推高级运维工程师白子画,为你们分享宽带优化之Flume Avro在个推的实践。性能优化
在异地日志数据互传的场景下,咱们从传输数据着手,借助Avro的特性使数据压缩率达80%以上,解决了个推在实际生产过程当中遇到的带宽不够用的问题。本文咱们将向你们介绍Flume Avro在数据传输过程当中所承担的不一样角色,以及如何保证数据的完整性和传输的高效性,并分享在实际业务中取得的优化效果。服务器
背景
个推做为专业的数据智能服务商,已经成功服务了数十万APP,每日的消息下发量达百亿级别,由此产生了海量日志数据。为了应对业务上的各类需求,咱们须要采集并集中化日志进行计算,为此个推选用了高可用的、高可靠的、分布式的Flume系统以对海量日志进行采集、聚合和传输。此外,个推也不断对Flume进行迭代升级,以实现本身对日志的特定需求。网络
原有的异地机房日志汇聚方式,整个流程相对来讲比较简单,A机房业务产生的日志经过多种方式写入该机房Kafka集群,而后B机房的Flume经过网络专线实时消费A机房Kafka的日志数据后写入本机房的Kafka集群,全部机房的数据就是经过相同方式在B机房Kakfa集群中集中化管理。如图一所示:数据结构
图一:原有异地日志传输模式运维
可是随着业务量的不断增长,日志数据在逐渐增多的过程当中对带宽要求变高,带宽的瓶颈问题日益凸显。按照1G的专线带宽成本2~3w/月来计算,一个异地机房一年仅专线带宽扩容成本就高达30w以上。对此,如何找到一种成本更加低廉且符合当前业务预期的传输方案呢?Avro有快速压缩的二进制数据形式,并能有效节约数据存储空间和网络传输带宽,从而成为优选方案。分布式
优化思路
Avro简介工具
Avro是一个数据序列化系统。它是Hadoop的一个子项目,也是Apache的一个独立的项目,其主要特色以下:
● 丰富的数据结构;
● 可压缩、快速的二进制数据类型;
● 可持久化存储的文件类型;
● 远程过程调用(RPC);
● 提供的机制使动态语言能够方便地处理数据。
具体可参考官方网站:http://avro.apache.org/oop
Flume Avro方案性能
Flume的RPC Source是Avro Source,它被设计为高扩展的RPC服务端,能从其余Flume Agent 的Avro Sink或者Flume SDK客户端,接收数据到Flume Agent中,具体流程如图二所示:
图二:Avro Source流程
针对该模式,咱们的日志传输方案计划变动为A机房部署Avro Sink用以消费该机房Kafka集群的日志数据,压缩后发送到B机房的Avro Source,而后解压写入B机房的Kafka集群,具体的传输模式如图三所示:
图三:Flume Avro传输模式
可能存在的问题
咱们预估可能存在的问题主要有如下三点:
● 当专线故障的时候,数据是否能保证完整性;
● 该模式下CPU和内存等硬件的消耗评估;
● 传输性能问题。
验证状况
针对以上的几个问题,咱们作了几项对比实验。
环境准备状况说明:
原有Flume模式验证(非Avro)
监控Kafka消费状况:
81流量统计:
82流量统计:
消费所有消息耗时:20min
消费总日志条数统计:129,748,260
总流量:13.5G
Avro模式验证
配置说明:
Avro Sink配置:
kafkatokafka.sources = kafka_dmc_bullet
kafkatokafka.channels = channel_dmc_bullet
kafkatokafka.sinks = kafkasink_dmc_bullet
kafkatokafka.sources.kafka_dmc_bullet.type = org.apache.flume.source.kafka.KafkaSource
kafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bullet
kafkatokafka.sources.kafka_dmc_bullet.zookeeperConnect = 192.168.10.81:2181
kafkatokafka.sources.kafka_dmc_bullet.topic = topicA
kafkatokafka.sources.kafka_dmc_bullet.kafka.zookeeper.connection.timeout.ms = 150000
kafkatokafka.sources.kafka_dmc_bullet.kafka.consumer.timeout.ms = 10000
kafkatokafka.sources.kafka_dmc_bullet.kafka.group.id = flumeavro
kafkatokafka.sources.kafka_dmc_bullet.batchSize = 5000
kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.AvroSink
kafkatokafka.sinks.kafkasink_dmc_bullet.hostname = 192.168.10.82
kafkatokafka.sinks.kafkasink_dmc_bullet.port = 55555 //与source的rpc端口一一对应
kafkatokafka.sinks.kafkasink_dmc_bullet.compression-type = deflate //压缩模式
kafkatokafka.sinks.kafkasink_dmc_bullet.compression-level = 6 //压缩率1~9
kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bullet
kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bullet
kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks = 1
kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize = 5000
kafkatokafka.channels.channel_dmc_bullet.type = memory
kafkatokafka.channels.channel_dmc_bullet.capacity = 100000
kafkatokafka.channels.channel_dmc_bullet.transactionCapacity = 5000
kafkatokafka.channels.channel_dmc_bullet.keep-alive = 60
Avro Source配置:
kafkatokafka.sources = kafka_dmc_bullet
kafkatokafka.channels = channel_dmc_bullet
kafkatokafka.sinks = kafkasink_dmc_bullet
kafkatokafka.sources.kafka_dmc_bullet.type = avro
kafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bullet
kafkatokafka.sources.kafka_dmc_bullet.bind = 0.0.0.0
kafkatokafka.sources.kafka_dmc_bullet.port = 55555 //rpc端口绑定
kafkatokafka.sources.kafka_dmc_bullet.compression-type = deflate //压缩模式
kafkatokafka.sources.kafka_dmc_bullet.batchSize = 100
kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.kafka.KafkaSink
kafkatokafka.sinks.kafkasink_dmc_bullet.kafka.partitioner.class = com.gexin.rp.base.kafka.SimplePartitioner
kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bullet
kafkatokafka.sinks.kafkasink_dmc_bullet.topic = topicB
kafkatokafka.sinks.kafkasink_dmc_bullet.brokerList = 192.168.10.82:9091,192.168.10.82:9092,192.168.10.82:9093
kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks = 1
kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize = 500
kafkatokafka.channels.channel_dmc_bullet.type = memory
kafkatokafka.channels.channel_dmc_bullet.capacity = 100000
kafkatokafka.channels.channel_dmc_bullet.transactionCapacity = 1000
监控Kafka消费状况
81流量统计:
82流量统计:
消费所有消息耗时:26min
消费总日志条数统计:129,748,260
总流量:1.69G
故障模拟
结论
生产环境实施结果
实施结果以下:
全文总结
Flume做为个推日志传输的主要工具之一,Source的类型选择尤其重要(如avro、thrif、exec、kafka和spooling directory等等)。不管选择哪一种Source,都是为了实现日志数据的高效传输。本文经过Avro的方式,解决了带宽资源瓶颈的问题。
将来,咱们但愿与更多开发者一块儿探索如何用更多的技术手段来节约控制成本,并知足更多的业务场景需求。