最近须要作一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程当中有一些须要注意的地方,记录以下。java
说到java中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,可是相比之下Timer能够排除,主要缘由有如下几点:redis
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new
ThreadPoolExecutor.AbortPolicy());
//从消息中取出延迟时间及相关信息的代码略
int delayTime = 0;
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
//具体操做逻辑
}},0,delayTime, TimeUnit.SECONDS);
复制代码
其中NamedThreadFactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就很少作介绍了。拒绝策略也是采用默认的拒绝策略。 而后测试了一下,知足目标需求的功能,能够作到延迟指定时间后执行,至此彷佛功能就被完成了。 你们可能疑问,这也太简单了有什么好说的,可是这种方式实现简单是简单可是存在一个潜在的问题,问题在哪呢,让咱们看一下ScheduledThreadPoolExecutor的源码:api
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0,
TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
复制代码
ScheduledThreadPoolExecutor 因为它自身的延时和周期的特性,默认使用了DelayWorkQueue,而并不像咱们平时使用的SingleThreadExecutor等构造是可使用本身定义的LinkedBlockingQueue而且设置队列大小,问题就出在这里。DelayWrokQueue是一个无界队列,而咱们的目标数据源是kafka,也就是一个高并发高吞吐的消息队列,很大可能在某一时间段有大量的消息过来从而致使OOM,在使用多线程时咱们是确定要考虑到OOM的可能性的,由于OOM带来的后果每每比较严重,系统OOM临时的解决办法通常只能是重启,可能会致使用户数据丢失等不可能挽回的问题,因此从编码设计阶段要采用尽量稳妥的手段来避免这些问题。安全
//添加元素
ZADD key score member [[score member] [score member] …]
//根据分值及限制数量查询
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
//从zset中删除指定成员
ZREM key member [member …]
复制代码
咱们采用redis基础数据结构的zset结构,采用score来存储咱们目标发送时间的数值,总体处理流程以下:bash
public void onMessage(String topic, String message) {
String orderId;
int delayTime = 0;
try {
Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
}.getType());
if (msgMap.isEmpty()) {
return;
}
LOGGER.info("onMessage kafka content:{}", msgMap.toString());
orderId = msgMap.get("orderId");
if(StringUtils.isNotEmpty(orderId)){
delayTime = Integer.parseInt(msgMap.get("delayTime"));
Calendar calendar = Calendar.getInstance();
//计算出预计发送时间
calendar.add(Calendar.MINUTE, delayTime);
long sendTime = calendar.getTimeInMillis();
RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
LOGGER.info("orderId:{}---放入redis中等待发送---sendTime:{}", ---orderId:{}, sendTime);
}
} catch (Exception e) {
LOGGER.info("onMessage 延时发送异常:{}", e);
}
}
复制代码
public void run(){
//获取批量大小
int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
try {
//批量获取离发送时间最近的orderNum条数据
Calendar calendar = Calendar.getInstance();
long now = calendar.getTimeInMillis();
//获取无限早到如今的事件key(防止上次批量数量小于放入数量,存在历史数据未消费状况)
Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
if (CollectionUtils.isNotEmpty(orders)){
//删除key 防止重复发送
for (String orderId : orderIds) {
RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
}
//接下来执行发送等业务逻辑
}
} catch (Exception e) {
LOGGER.warn("task.run exception:{}", e);
}
}
复制代码
至此完成了依赖redis和线程完成了延时发送的功能。数据结构
那么对上面两种不一样的实现方式进行一下优缺点比较:多线程
第一种方式实现简单,不依赖外部组件,可以快速的实现目标功能,但缺点也很明显,须要在特定的场景下使用,若是是我这种消息量大的状况下使用极可能是有问题,固然在数据源消息很少的状况下不失为好的选择。并发
第二种方式实现稍微复杂一点,可是可以适应消息量大的场景,采用redis的zset做为了“中间件”的效果,而且帮助咱们进行延时的功能实现可以较好的适应高并发场景,缺点在于在编写的过程当中须要考虑实际的因素较多,例如线程的执行周期时间,发送可能会有必定时间的延迟,批量数据大小的设置等等。ide
综上是本人此次延时功能的实现过程的两种实现方式的总结,具体采用哪一种方式还需你们根据实际状况选择,但愿能给你们带来帮助。ps:因为本人的技术能力有限,文章中可能出现技术描述不许确或者错误的状况恳请各位大佬指出,我立马进行改正,避免误导你们,谢谢!高并发