基于Redis实现分布式定时任务调度

 

项目开发过程当中,不免会有许多定时任务的需求进来。若是项目中尚未引入quarzt框架的状况下,咱们一般会使用Spring的@Schedule(cron="* * * * *")注解html

样例以下:java

package com.slowcity.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;

public class SentMailTask {
    private static final Logger log = LoggerFactory.getLogger(SentMailTask.class);
   /**
    * 定时任务
    */
    @Scheduled(cron = "0 0/1 * * * ? ")
    public void closeOrderTaskV1() {
        log.info(".........schedule task start.........");
        
        sentMailToCustomer();
        
        log.info(".........schedule task end.........");
    }
     
    public void sentMailToCustomer() {
        log.info(".........sent mail to customer.........");
    }
}
 

这样实现天然是没有什么问题,对于单台机器部署,任务每一分钟执行一次。部署多台机器时,同一个任务会执行屡次redis

在咱们的项目当中,使用定时任务是避免不了的,咱们在部署定时任务时,一般只部署一台机器,此时可用性又没法保证现实状况是独立的应用服务一般会部署在两台及以上机器的时候,假若有3台机器,则会出现同一时间3台机器都会触发的状况,结果就是会向客户发送三封如出一辙的邮件,真让人头疼。若是使用quarzt,就不存在这个状况了。spring

这种并发的问题,简单点说是锁的问题,具体点是分布式锁的问题,因此在这段代码上加个分布式锁就能够了。分布式锁,首先想到的是redis,毕竟轮子都是现成的。缓存

package com.slowcity.redis;

import java.util.Collections;
import redis.clients.jedis.Jedis;

public class RedisPool {
    private static final String LOCK_SUCCESS="OK";
    private static final String SET_IF_NOT_EXIST="NX";
    private static final String SET_WITH_EXPIRE_TIME="PX";
    private static final Long RELEASE_SUCCESS=1L;
    
    /**
     * 获取分布式锁
     * @param jedis
     * @param lockKey
     * @param requestID
     * @param expireTime
     * @return
     */
    public static boolean getDistributedLock(Jedis jedis,String lockKey,String requestId,int expireTime) {
        String result = jedis.set(lockKey,requestId,SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME,expireTime);
        if(LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
        
    }
    /**
     * 释放分布式锁
     * @param jedis
     * @param lockKey
     * @param requestId
     * @return
     */
    public static boolean releaseDistributedLock(Jedis jedis,String lockKey,String requestId) {
        String script = "if redis.call('get',KEYS[1])== ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
        Object result = jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));
        if(RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}

改造一下定时任务,增长分布式锁springboot

package com.slowcity.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;

import redis.clients.jedis.Jedis;

public class SentMailTask {
    private static final Logger log = LoggerFactory.getLogger(SentMailTask.class);
   /**
    * 定时任务
    */
    @Scheduled(cron = "0 0/1 * * * ? ")
    public void closeOrderTaskV1() {
        log.info(".........schedule task start.........");
        Jedis jedis = new Jedis("10.2.1.17",6379);
        boolean locked = RedisPool.getDistributedLock(jedis, "", "", 10*1000);
        if(locked) {
            sentMailToCustomer();
        }
        RedisPool.releaseDistributedLock(jedis, "", "");
        jedis.close();
        log.info(".........schedule task end.........");
    }
     
    public void sentMailToCustomer() {
        log.info(".........sent mail to customer.........");
    }
}
 

再执行定时任务,多台机器部署,只执行一次。服务器

关于jedis对象的获取,通常都是springboot自动化配置的,全部会想到工厂方法。优化以下:并发

package com.slowcity.redis;

import java.lang.reflect.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.ReflectionUtils;
import redis.clients.jedis.Jedis;

public class SentMailTask {
    private static final Logger log = LoggerFactory.getLogger(SentMailTask.class);
   
    @Autowired
    private RedisConnectionFactory redisConectionFactory;
    
    /**
    * 定时任务
    */
    @Scheduled(cron = "0 0/1 * * * ? ")
    public void closeOrderTaskV1() {
        log.info(".........schedule task start.........");
        
        RedisConnection redisConnection = redisConectionFactory.getConnection();
        Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis");
        Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, redisConnection);
       
        boolean locked = RedisPool.getDistributedLock(jedis, "lockKey", "requestId", 10*1000);
        if(locked) {
            sentMailToCustomer();
        }
        RedisPool.releaseDistributedLock(jedis, "", "");
        jedis.close();
        log.info(".........schedule task end.........");
    }
     
    public void sentMailToCustomer() {
        log.info(".........sent mail to customer.........");
    }
}
  

不再用担忧,应用服务多台机器部署,每台机器都触发的尴尬了。若是定时任务不少,最好的仍是老老实实写个任务调度中心,一来方便管理,二来方便维护。框架

补充部分:分布式

 一些关于lua脚本的解释

String script = "if redis.call('get',KEYS[1])== ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Object result = jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));        

若是一个请求更新缓存的时间比较长,甚至比锁的有效期还要长,致使在缓存更新的过程当中,锁就失效了,此时另外一个请求就会获取锁,但前一个请求在缓存更新完毕的时候,若是不加以判断就直接删除锁,就会出现误删除其它请求建立的锁的状况。

【end】

 

一点补充的话,写完这篇博客后来看其余博客,也有一种redis锁是关联主机ip的,思路上是可行的,不失一个方法点,主要描述以下:

每一个定时任务都在Redis中设置一个Key-Value,Key为自定义的每一个定时任务的名字(如task1:redis:lock),Value为服务器Ip,同时设置合适的过时时间(例如设置为5min)。

每一个节点在执行时,都要进行如下操做:

  • 1.是否存在Key,若不存在,则设置Key-Value,Value为当前节点的IP
  • 2.若存在Key,则比较Value是不是当前Ip,如果则继续执行定时任务,若不是,则不往下执行。
相关文章
相关标签/搜索