数据采集系统的优化实战

1 概述

在历时2个月的不断优化过程当中,将数据采集系统的处理能力(kafka一个topic)从2.5万提高到了10万,基本符合对下一次峰值的要求了。前端

全部日志中,其中广告日志和做品日志量是最大的,因此本次的优化也是针对这两块进行优化。nginx

广告日志接口TPS从以前的不到1k/s,提高到如今的2.1w/s,提高了20倍。 spring

做品日志接口TPS从以前的不到1k/s,提高到如今的1.4w/s,提高了13倍。apache

在数据采集的优化过程当中,设计到不少地方,包含代码的优化,框架的优化,服务的优化,如今将对吞吐率的提升有明显左右的优化点记录下来。后端

2 面向对象

技术负责人,后端服务开发工程师服务器

3 撰写时间

2020年04月03日app

4 技术框架图

arti1.png

5 后端日志ETL程序LogServer的优化

广告日志接口TPS从以前的不到1k/s,提高到如今的2.1w/s,提高了将近20倍。
做品日志接口TPS从以前的不到1k/s,提高到如今的1.4w/s,提高了13倍。 框架

1.广告日志接口的压测结果部分截图maven

arti2.png

2.做品日志接口的压测结果部分截图tcp

arti3.png

如下TPS的提高是一个大概的值。

5.1 删除代码中没必要要的打印日志

例如

System.out.println
    System.out.println
    logger.info

TPS 1k -> 3k

5.2 将logback.xml文件中的打印日志关闭

例如

<appender-ref ref="action"/>
    <appender-ref ref="console"/>

TPS 3k -> 5k

5.3 对获取kafka相关的logger的代码优化

例如
以前的代码

public synchronized static Logger getLogger(String topic) {
    Logger logger = loggers.get(topic);
    try {
    if (logger == null) {
        logger = LoggerFactory.getLogger(topic);
        loggers.put(topic, logger);
    }
    return logger;
 }

优化后的代码

public  static Logger getLogger(String topic) {
    if (logger == null) {
        synchronized(KafkaLoggerFactory.class){
            if(logger == null){
                logger = LoggerFactory.getLogger(topic);
                loggers.put(topic, logger);
            }
        }
    }
}

TPS 5k -> 9k

5.4 对流量广告的逻辑进行简化

以前的作法:
将广告数据当成普通的日志数据来处理,会经历全部的日志判断逻辑,最后通过验证,数据没问题,才会发送到kafka,整个逻辑链条比较长。

如今的作法:
先看代码

ip: String ip = request.getIp();
    collection.put("ip", ip);
    // 国家、地区、城市: collection.putAll(Constant.getRegionInfo(ip));
    server_host: collection.put("srh", Constant.serverHost);
    server_time: collection.put("s_t", System.currentTimeMillis());

    if( "traffic_view".equals(collection.get("product")) ){
        parseAdRecord(collection);
        return Constant.RESPONSE_CODE_NORMAL;
    }

    ...

    public void parseAdRecord(Map<String, Object> collection){
        try {
            collection = Constant.clearAdCollection(collection);
            log2kafka(Constant.eventTopic, JSONObject.toJSONString(collection));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

从上边的代码能够看出,将广告的逻辑单独处理进行处理,整个链路短了不少,大概总共分3步:
1 必须的公共字段处理
2 判断是否广告日志
3 将广告日志发送到kafka

TPS 9k -> 1.2w

5.5 对广告日志中的字段进行精简

将HDFS上广告日志总共的85个字段,如今精简到45个左右。这一步虽然对LogServer的吞吐率没有太多的提高。可是却能够将kafka的吞吐量提高将近一倍。

5.6 升级和简化依赖
  1. 首先将非必要的maven依赖所有删除掉了,使得依赖的数量从217个减小到了51个。
  2. 将maven依赖升级成比较新的版本。
  3. 有些依赖被删掉了,相关的类也作了调整,好比StringUtils.isEmpty()已经从spring类
org.springframework.util.StringUtils

调整成了commons-lang3包内的org.apache.commons.lang3.StringUtils

<dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.10</version>
        </dependency>

6 服务器硬件层面

从以前的4核8G服务器迁移到8核16G的服务器上。
而且对服务器内核参数作了如下优化:

net.core.somaxconn = 10240
net.core.netdev_max_backlog =262144
net.ipv4.tcp_keepalive_intvl = 5
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_tw_reuse = 1
net.ipv4.ip_local_port_range = 1024     60999
net.ipv4.tcp_fin_timeout = 30
net.ipv4.tcp_syn_retries = 1
net.ipv4.tcp_synack_retries = 1

1.2w -> 2w

7 对前端SDK的优化

通过对kafka的写入压测,当日志大小为1024字节的时候的QPS是2048的接近2倍。

arti4.png

1 减小前端上报日志字段数量,将暂时未用的的字段删掉。前端sdk上报的日志字段从71个删减到48个字段,减小了32%的字段数量。
2 没必要要的日志再也不上报,主要是经过修改前端日志上报的逻辑来实现。

8 对Nginx的优化:

对Nginx的优化主要有2个方面:

  1. 服务器层面的优化,如上述第5项
  2. Nginx自己的配置优化
  3. 增长了ip防刷的机制
8.1 Nginx一部分配置的优化。

worker_connections从20480上调到了102400,调大了5倍。调大以后nginx的吞吐量从2w/s,提高到3.5w/s,设置的时候最好结合业务和服务器的性能先作个压测。

worker_processes 默认是1,官方建议和cpu的核数相同,或者直接设置成auto。有人建议设置成cpu核数的2倍,从个人测试状况来看,不会有明显的提升,也多是场景覆盖的有限。

worker_cpu_affinity Nginx默认没有开启利用多核cpu,经过worker_cpu_affinity开启nginx利用多核cpu,并将worker绑定到指定的线程上,以此来提升nginx的性能。

multi_accept 默认Nginx没有开启multi_accept。multi_accept可让nginx worker进程尽量多地接受请求。它的做用是让worker进程一次性地接受监听队列里的全部请求,而后处理。若是multi_accept的值设为off,那么worker进程必须一个一个地接受监听队列里的请求。

worker_processes  8;
worker_cpu_affinity 00000001 00000010 00000100 00001000 00010000 00100000 01000000 10000000;

worker_connections  102400;
multi_accept on;

优化完以后QPS从1万左右上升到3.5万。

8.2 ip防刷

在conf/module/定义了一个blacklist文件:

map $http_x_forwarded_for $ip_action{
    default            0;
    ~123\.123\.29      1;
}

在nginx.conf中增长ip过滤的配置:

location /log.gif {
        if ($ip_action) {
            return 403;
        }
        proxy_pass         http://big-da/log-server/push;
        proxy_set_header   Host             $host;
        proxy_set_header   X-Real-IP        $remote_addr;
        proxy_set_header   X-Forwarded-For  $proxy_add_x_forwarded_for;
        client_max_body_size       128k;
        client_body_buffer_size    32k;
        proxy_connect_timeout      5;
        proxy_send_timeout         5;
        proxy_read_timeout         5;
        proxy_http_version           1.1;
        proxy_set_header Connection "";
    }

若是是黑名单中的ip,则直接拒绝请求。

9 对kafka的优化

1.将全部重要topic的Replication从1改为了2,这样保证在kafka有1个节点故障的时候,topic也能正常工做。

arti5.png

2.为每一个节点的kafka设置了独用的SSD硬盘。
3.topic分区的分区数量根据业务需求设置,咱们设置了6个分区。
3.在producer端写kafka的使用使用snappy压缩格式
4.在producer端合理设置batch.size

batch.size用来控制producer在将消息发送到kafka以前须要攒够多少本身的数据。默认是16kB,通过测试,在32kB的状况下,吞吐量和压测都在接受的接受的范围内。

5.在producer端合理设置linger.ms

默认没有设置,只要有数据就会当即发送。能够将linger.ms设置为100,在流量比较大的时候,能够减小发送请求的数量,从而提升吞吐量。

6.升级版本,将kafka从0.10升到了2.2.1

做者:易企秀工程师 Peace -> 我的主页

相关文章
相关标签/搜索