在线上环境,因为业务场景须要,要求程序可以在普通的4G机器中依然正常运行。 而原来的环境配置为8核16G,微服务部署,一共有6个功能模块。而如今要求在一台4核4G的设备上正常运行。html
@Configuration @Bean @Controller @Service @Repository
等注解中没有指定Bean实例的名称。现有的处理流程以下:java
项目采用SpringBoot构建,引入 spring-boot-stater-redis
1. 经过HTTP接收到异步事件,存储到Redis;
2. 存储的同时,将事件经过Redis的发布订阅发送到不一样的处理单元进行处理;
3. 每一个事件处理单元经过Redis订阅,而后处理事件;
4. 起一个定时器,每秒钟从Redis中查询一个时间窗口的事件,构建索引,而后bulkIndex到ES
复制代码
1. Redis的订阅发布,内部会维护一个container线程,此线程会一直存在;
2. 每次订阅,都会产生一个新的前缀为RedisListeningContainer-的线程处理;
3. 经过jvisualvm.exe 查看线程数,该类线程数一直在飙升
复制代码
程序中的实现以下:node
@Bean
RedisMessageListenerContainer manageContainer(
RedisConnectionFactory factory, MessageListener listener) {
RedisMessageListenerContainer manageContainer =
new RedisMessageListenerContainer ();
manageContainer.setConnectionFactory(factory);
// manageContainer.setTaskExecutor();
}
复制代码
代码中被注释掉的那一行,实际代码中是没有该行的,也就是没有设置taskExecutor
linux
listener-container
的说明,默认的task-executor
和subscription-task-executor
使用的是SimpleAsyncTaskExecutor
。RedisMessageListenerContainer.classredis
...
protected TaskExecutor createDefaultTaskExecutor() {
String threadNamePrefix = (beanName != null ? beanName + "-" :
DEFAULT_THREAD_NAME_PREFIX) ;
return new SimpleAsyncTaskExecutor(threadNamePrefix);
}
...
复制代码
SimpleAsyncTaskExecutor.classspring
...
protected void doExecute(Runnable task) {
Thread thread =
(this.threadFactory != null
? this.threadFactory,newThread(task)
: createThread(task));
thread.start();
}
...
复制代码
SimpleAsyncTaskExecutor
的execute()方法,是很无耻的new Thread()
,调用thread.start()
来执行任务找到问题的产生缘由,主要的解决思路有三种:缓存
配置manageContainer.setTaskExecutor();
而后选择本身建立的线程池;bash
去掉一部分发布订阅,改用Spring
提供的观察者模式,将绝大部分事件处理的场景,经过此方式完成发布。 SpringUtils.getApplicationContext() .publihEvent(newEventOperation(eventList));
服务器
采用Rector
模式实现事件的异步高效处理;网络
建立了2个线程组(参考netty的底层实现):
1. 一个用于处理事件接收 “event-recv-executor-”
coreSize = N * 2,CPU密集型
2. 一个用于事件的异步处理 “event-task-executor-”
coreSize = N / 0.1,IO密集型
复制代码
事件处理逻辑
@Override
public void onApplicationEvent (EventOperation event) {
eventTaskExecutor.execute(() -> {
doDealEventOperation(event);
});
}
复制代码
现有的处理流程以下:
项目采用SpringBoot构建,引入 spring-boot-stater-redis
1. 后台维护了一个定时器,每秒钟从Redis中查询一个时间窗口的事件
复制代码
在后台定位日志输出,正常状况下,应该是每秒钟执行一次定时,
但实际是,系统并不保证必定能每隔1S执行一次,
因为系统中线程比较多,CPU的切换频繁,
致使定时有可能1S执行几回或者每隔几秒执行一次
复制代码
因为定时并没有法保证执行,而定时任务获取事件时,是按照时间窗口截取,
经过redisTemplate.opsForZSet().rangeByScore(key, minScore, maxScore)实现,
势必会形成有数据没法被加载到程序中,而一直保存在Redis中,没法获取,也没法删除
复制代码
找到问题的产生缘由,主要的解决思路有两种:
加大容错率,将时间窗口拉大,原来是相隔1S的时间窗口,修改成相隔1MIN 【治标不治本,极端状况下,仍有可能形成该问题】;
采用MQ消费,此方法须要额外部署MQ服务器,在集群配置高的状况下,能够采用,在配置低的机器下不合适;
采用阻塞队列,利用Lock.newCondition()
或者最普通的网络监听模式while()
均可以;
本次问题中采用的是第三种形式。起一个单独的线程,阻塞监听。
1. 事件接收后,直接塞到一个BlockingQueue中;
2. 当BlockingQueue有数据时,While循环不阻塞,逐条读取队列中的信息;
3. 每隔1000条数据,或者每隔1S,将数据写入ES,并分发其余处理流程
复制代码
在4G的机器下,发现通过一段时间的发包处理后,系统cache增加的很是快,最后几近于所有占满:
大概每秒钟10M的涨幅
复制代码
1. 由于对于ES的了解,插入数据时,先写缓存,后fsync到磁盘上,所以怀疑ES可能存在问题;
2. 项目中日志使用log4j2不当:
* 日志输出过多,
* 日志没有加判断:if (log.isInfoEnabled())
* 日志文件append过大,没有按照大小切分等(本项目此问题以前已解决)
复制代码
通过隔段分析,将有可能出现问题的地方,分别屏蔽后,进行测试。
最终定位到,在ES批量写入数据时,才会出现cache大量增加的现象
复制代码
用命令查看内存free -m
,
buffer
: 做为buffer cache
的内存,是块设备的读写缓冲区cached
表示page cache的内存
和文件系统的cache
ES操做数据的底层机制:
数据写入时,ES内存缓慢上升,是由于小文件过多(ES自己会在index时候创建大量的小文件),linux dentry
和 inode cache
会增长。 能够参考:ES内存持续上升问题定位
本问题其实并无彻底解决,只是在必定程度上用性能换取缓存。
echo 10000 > /proc/sys/vm/vfs_cache_pressure;
复制代码
## 这些参数是以前优化的
threadpool.bulk.type: fixed
threadpool.bulk.min: 10
threadpool.bulk.max: 10
threadpool.bulk.queue_size: 2000
threadpool.index.type: fixed
threadpool.index.size: 100
threadpool.index.queue_size: 1000
index.max_result_window: 1000000
index.query.bool.max_clause_count: 1024000
# 如下的参数为本次优化中添加的:
# 设置ES最大缓存数据条数和缓存失效时间
index.cache.field.max_size: 20000
index.cache.field.expire: 1m
# 当内存不足时,对查询结果数据缓存进行回收
index.cache.field.type: soft
# 当内存达到必定比例时,触发GC。默认为JVM的70%[内存使用最大值]
#indices.breaker.total.limit: 70%
# 用于fielddata缓存的内存数量,
# 主要用于当使用排序操做时,ES会将一些热点数据加载到内存中来提供客户端访问
indices.fielddata.cache.expire: 20m
indices.fielddata.cache.size: 10%
# 一个节点索引缓冲区的大小[max 默认无限制]
#indices.memory.index_buffer_size: 10%
#indices.memory.min_index_buffer_size: 48M
#indices.memory.max_index_buffer_size: 100M
# 执行数据过滤时的数据缓存,默认为10%
#indices.cache.filter.size: 10%
#indices.cache.filter.expire: 20m
# 当tranlog的大小达到此值时,会进行一次flush操做,默认是512M
index.translog.flush_threshold_size: 100m
# 在指定时间间隔内若是没有进行进行flush操做,会进行一次强制的flush操做,默认是30分钟
index.translog.flush_threshold_period: 1m
# 多长时间进行一次的磁盘操做,默认是5S
index.gateway.local.sync: 1s
复制代码
若是文中有描述失误内容,或者没有描述清楚的,能够将问题发我邮箱,harveytuan@163.com
, 若是有其余问题,也能够联系我,你们一块儿共同讨论。