LTS任务调度使用

LTS(light-task-scheduler)主要用于解决分布式任务调度问题,支持实时任务,定时任务和Cron任务。有较好的伸缩性,扩展性,健壮稳定性而被多家公司使用。java

项目主页 https://github.com/ltsopensource/light-task-schedulernode

原来项目使用Quartz做为定时器解决方案,可是Quartz没有可视化的任务运行时调度和监控(有数据库,可是须要本身开发界面),每次想要修改定时器配置都比较麻烦,因此引入了LTSgit

该框架主要有四个节点:github

  • JobClient:主要负责提交任务, 并接收任务执行反馈结果。
  • JobTracker:负责接收并分配任务,任务调度。
  • TaskTracker:负责执行任务,执行完反馈给JobTracker。
  • Monitor:(管理后台)主要负责节点管理,任务队列管理,监控管理等。

因为目前系统所须要的任务都是固定任务,所已其中JobClient所有在页面中进行提交,因此不部署
Monitor使用官方提供页面(war),直接部署到tomcat,默认用户名密码均为adminspring

JobTracker部署

clone项目的源码,运行根目录下的sh build.sh或build.cmd脚本,会在dist目录下生成lts-{version}-bin文件夹。
或者直接解压dist下面原有的lts-1.6.8-beta1-bin.zip,也能实现一样效果,本人使用了第二种方法。数据库

修改生成的或解压的文件夹中conf/zoo文件夹下的配置文件,修改成实际使用ZooKeeper和MySQL的配置apache

在生成的或解压的文件夹中执行 sh jobtracker.sh zoo start 便可启动JobTrackertomcat

TaskTracker使用

tasktracker须要在业务中实现代码,简单说是本身编写一个任务执行类,实现JobRunner接口,在run方法中实现本身的逻辑便可。markdown

贴一个官方提供的TaskTracker例子app

public class MyJobRunner implements JobRunner {
    @Override
    public Result run(JobContext jobContext) throws Throwable {
        try {
            // TODO 业务逻辑
            // 会发送到 LTS (JobTracker上)
            jobContext.getBizLogger().info("测试,业务日志啊啊啊啊啊");

        } catch (Exception e) {
            return new Result(Action.EXECUTE_FAILED, e.getMessage());
        }
        return new Result(Action.EXECUTE_SUCCESS, "执行成功了,哈哈");
    }
}

因为原来的项目使用的了Spring,因此直接写配置文件便可暴露节点,一样贴上官方例子:

<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
    <property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/>
    <property name="bizLoggerLevel" value="INFO"/>
    <property name="clusterName" value="test_cluster"/>
    <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
    <property name="nodeGroup" value="test_trade_TaskTracker"/>
    <property name="workThreads" value="20"/>
    <property name="configs">
        <props>
            <prop key="job.fail.store">leveldb</prop>
        </props>
    </property>
</bean>

启动项目,节点就暴露成功了。

一个TaskTracker执行多种任务

通常一个系统中每每不止一个任务,须要使用LTS对多个任务进行调度,一开始本人的想法是在一个项目中启动多个任务节点来接受任务调度。后来发现一个JVM中使用多个TaskTracker实例比较浪费资源,在正式项目运行过程当中出现了文件句柄过多致使任务大量失败的状况,因此已经弃用。参考了LTS做者提供的文档之后,对任务进行了简单封装,以知足这种需求。

  1. 建立一个集中的任务调度器,该bean在启动时将IOC容器中全部JobRunner取出放入一个MAP中,当该JOB被LTS调用时根据参数判断具体调度那个任务
* @author ElinZhou
 * @version $Id: JobRunnerDispatcher.java , v 0.1 2016/6/24 16:39 ElinZhou Exp $
 */
public class JobRunnerDispatcher implements JobRunner, ApplicationContextAware {

    private static final ConcurrentHashMap<String/*type*/, JobRunner> JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();

    @Override
    public Result run(JobContext jobContext) throws Throwable {

        //根据type选择对应的JobRunner运行
        Job job = jobContext.getJob();
        String type = job.getParam("type");
        return JOB_RUNNER_MAP.get(type.toUpperCase()).run(jobContext);

    }

    /** * 从IOC容器中将JobRunner类型的bean放入map中 * @param applicationContext * @throws BeansException */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, JobRunner> map = applicationContext.getBeansOfType(JobRunner.class);
        for (String type : map.keySet()) {
            JOB_RUNNER_MAP.put(type.toUpperCase(), map.get(type));
        }
    }
}
  1. 将本身的实现了JobRunner的任务如上文的MyJobRunner 类注入IOC容器中,例如直接在任务类前面加入@Component注解
  2. 在提交任务时在参数中指定具体要调度的任务,如:{“type”:”MyJobRunner”}
  3. 暴露集中任务调度器JobRunnerDispatcher
<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
        <property name="jobRunnerClass" value="com.elin4it.util.taskSchedule.JobRunnerDispatcher"/>
        <property name="bizLoggerLevel" value="INFO"/>
        <!--集群名,在auto-config配置并从配置中心获取-->
        <property name="clusterName" value="${task.clusterName}"/>
        <!--zookeeper地址,在auto-config配置并从配置中心获取-->
        <property name="registryAddress" value="${zookeeper.url}"/>
        <!--节点组名称,在部署集群时同一个系统须要统一,在auto-config配置-->
        <property name="nodeGroup" value="${system.name}-DISPATCHER"/>
        <!--工做线程数,根据本身任务定义,要求不小于本系统最大任务数-->
        <property name="workThreads" value="10"/>
        <property name="configs">
            <props>
                <prop key="job.fail.store">leveldb</prop>
            </props>
        </property>
    </bean>

因为当初写这篇文章的时候对LTS的原理认识不足以及能力尚浅,因此后面的内容的代码有严重问题,请各位看官不要继续使用

多个任务节点暴露

LTS原生仅支持一个nodegroup就要写一套配置,如上所示的XML配置文件,这样写很啰嗦,因此我对他进行了一次封装,先贴上代码:

1.精简后的任务配置Bean

import java.util.Properties;

import com.github.ltsopensource.tasktracker.runner.JobRunner;

/** * @author ElinZhou * @version $Id: TaskTrackerCustom.java , v 0.1 2016/5/31 17:19 ElinZhou Exp $ */
public class TaskTrackerCustom {
    /** * 任务运行类 */
    private Class<? extends JobRunner> jobRunnerClass;

    /** * 节点组(默认为系统名称-运行类名) */
    private String                     nodeGroup;

    /** * 线程数(默认为1) */
    private int                        workThreads;
    /** * 额外参数配置 */
    private Properties                 configs = new Properties();

    public Class<? extends JobRunner> getJobRunnerClass() {
        return jobRunnerClass;
    }

    public void setJobRunnerClass(Class<? extends JobRunner> jobRunnerClass) {
        this.jobRunnerClass = jobRunnerClass;
    }

    public String getNodeGroup() {
        return nodeGroup;
    }

    public void setNodeGroup(String nodeGroup) {
        this.nodeGroup = nodeGroup;
    }

    public int getWorkThreads() {
        return workThreads;
    }

    public void setWorkThreads(int workThreads) {
        this.workThreads = workThreads;
    }

    public Properties getConfigs() {
        return configs;
    }

    public void setConfigs(Properties configs) {
        this.configs = configs;
    }
}

2.异常类

/** * 任务执行器异常 * * @author ElinZhou * @version $Id: TaskTrackerException.java , v 0.1 2016/5/31 17:33 ElinZhou Exp $ */
public class TaskTrackerException extends RuntimeException {

    public TaskTrackerException(String detail) {
        super(detail);
    }
}

三、本身封装工厂Bean

import java.util.*;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;

import com.github.ltsopensource.spring.tasktracker.JobDispatcher;
import com.github.ltsopensource.tasktracker.TaskTracker;
import com.github.ltsopensource.tasktracker.runner.JobRunner;
import com.github.ltsopensource.tasktracker.runner.RunnerFactory;


/** * 任务执行器XML配置工厂 * * @author ElinZhou * @version $Id: TaskTrackerXmlFactoryBean.java , v 0.1 2016/5/31 17:18 ElinZhou Exp $ */
public class TaskTrackerXmlFactoryBean implements InitializingBean, ApplicationContextAware {

    /** * 任务执行器 */
    private List<TaskTrackerCustom> taskTrackerCustoms = new ArrayList<TaskTrackerCustom>();

    /** * 任务执行器 */
    private List<String>            jobRunnersName     = new ArrayList<String>();

    /** * 注册中心地址 */
    private String                  registryAddress;

    /** * 集群名称 */
    private String                  clusterName;

    /** * 系统名称 */
    private String                  systemName;

    private ApplicationContext      applicationContext;

    @Override
    public void afterPropertiesSet() throws Exception {
        if (StringUtils.isBlank(registryAddress)) {
            throw new TaskTrackerException("注册中心地址不能为空");
        }
        if (StringUtils.isBlank(clusterName)) {
            throw new TaskTrackerException("集群名称不能为空");
        }
        if (StringUtils.isBlank(systemName)) {

            throw new TaskTrackerException("本系统名称不能为空");

        }
    }

    public void start() {

        //若是直接配置了JobRunner,则转换后加入taskTrackerCustoms
        TaskTrackerCustom custom;
        for (String jobRunnerName : jobRunnersName) {
            Class clazz;
            try {
                clazz = Class.forName(jobRunnerName);
            } catch (Exception e) {
                throw new TaskTrackerException(jobRunnerName + " 不存在");
            }
            //判断该类是否实现了JobRunner
            if (!new HashSet<Class>(Arrays.asList(clazz.getInterfaces())).contains(JobRunner.class)) {
                throw new TaskTrackerException(jobRunnerName + " 没有实现JobRunner接口");
            }
            custom = new TaskTrackerCustom();
            custom.setNodeGroup(clazz.getSimpleName());
            custom.setJobRunnerClass(clazz);
            custom.setWorkThreads(1);
            taskTrackerCustoms.add(custom);
        }

        //将自定义任务执行器转换为框架任务执行器
        TaskTracker taskTracker;
        for (TaskTrackerCustom taskTrackerCustom : taskTrackerCustoms) {
            taskTracker = new TaskTracker();
            // 任务执行类,实现JobRunner 接口
            if (taskTrackerCustom.getJobRunnerClass() == null) {
                throw new TaskTrackerException("任务执行类不能为空");
            }
            final String beanName = registerRunnerBeanDefinition(taskTrackerCustom
                .getJobRunnerClass());
            taskTracker.setRunnerFactory(new RunnerFactory() {
                @Override
                public JobRunner newRunner() {
                    return (JobRunner) applicationContext.getBean(beanName);
                }
            });
            taskTracker.setRegistryAddress(registryAddress);
            if (StringUtils.isBlank(taskTrackerCustom.getNodeGroup())) {
                taskTrackerCustom.setNodeGroup(taskTrackerCustom.getJobRunnerClass()
                    .getSimpleName());
            }
            taskTracker.setNodeGroup(systemName + "-" + taskTrackerCustom.getNodeGroup()); // 同一个TaskTracker集群这个名字相同
            taskTracker.setClusterName(clusterName);
            if (taskTrackerCustom.getWorkThreads() == 0) {
                taskTracker.setWorkThreads(1);
            } else {
                taskTracker.setWorkThreads(taskTrackerCustom.getWorkThreads());
            }

            Iterator<Map.Entry<Object, Object>> it = taskTrackerCustom.getConfigs().entrySet()
                .iterator();
            while (it.hasNext()) {
                Map.Entry<Object, Object> entry = it.next();
                Object key = entry.getKey();
                Object value = entry.getValue();
                taskTracker.addConfig((String) key, (String) value);
            }
            taskTracker.start();
        }
    }

    /** * 将 JobRunner 生成Bean放入spring容器中管理 * 采用原型 scope, 因此能够在JobRunner中使用@Autowired */
    private String registerRunnerBeanDefinition(Class jobRunnerClass) {
        DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) ((ConfigurableApplicationContext) applicationContext)
            .getBeanFactory();
        String jobRunnerBeanName = "LTS_".concat(jobRunnerClass.getSimpleName());
        if (!beanFactory.containsBean(jobRunnerBeanName)) {
            BeanDefinitionBuilder builder = BeanDefinitionBuilder
                .rootBeanDefinition(jobRunnerClass);
            if (jobRunnerClass == JobDispatcher.class) {
                builder.setScope(BeanDefinition.SCOPE_SINGLETON);
                builder.setLazyInit(false);
                builder.getBeanDefinition().getPropertyValues()
                    .addPropertyValue("shardField", null);
            } else {
                builder.setScope(BeanDefinition.SCOPE_PROTOTYPE);
            }
            beanFactory.registerBeanDefinition(jobRunnerBeanName, builder.getBeanDefinition());
        }
        return jobRunnerBeanName;
    }

    public List<TaskTrackerCustom> getTaskTrackerCustoms() {
        return taskTrackerCustoms;
    }

    public void setTaskTrackerCustoms(List<TaskTrackerCustom> taskTrackerCustoms) {
        this.taskTrackerCustoms = taskTrackerCustoms;
    }

    public List<String> getJobRunnersName() {
        return jobRunnersName;
    }

    public void setJobRunnersName(List<String> jobRunnersName) {
        this.jobRunnersName = jobRunnersName;
    }

    public String getRegistryAddress() {
        return registryAddress;
    }

    public void setRegistryAddress(String registryAddress) {
        this.registryAddress = registryAddress;
    }

    public String getClusterName() {
        return clusterName;
    }

    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }

    public String getSystemName() {
        return systemName;
    }

    public void setSystemName(String systemName) {
        this.systemName = systemName;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

以上代码实现了只须要配置一次registryAddress、clusterName等信息,但能开启多个任务节点。Spring配置时有两种方式,一个是将JobRunner实现类全类名组成的List注入jobRunnersName字段中,其中nodegroup将使用‘系统名称’-‘类型’,线程数设置为1;另外一种方式建立TaskTrackerCustom ,在其中设置该任务的一些配置,而后将TaskTrackerCustom 组成的List注入到taskTrackerCustoms字段中。
两种方式能够同时使用,所配置的任务将都会启动。

Spring配置代码:

<bean id="taskTrackerXmlFactoryBean" class="com.elin4it.common.util.taskSchedule.TaskTrackerXmlFactoryBean" init-method="start">
        <!--ZooKeeper地址-->
        <property name="registryAddress" value="zookeeper://cc.yumei.cn:2181"/>
        <!--集群名称-->
        <property name="clusterName" value="elin_cluster"/>
        <!--经过任务执行类名建立任务,其他配置均使用默认值-->
        <property name="jobRunnersName">
            <list>
                <value>com.elin4it.biz.daemon.task.withdraw.SingleWithdrawSenderRunner</value>
            </list>
        </property>

        <!--经过自定义封装的配置类建立任务,可自定义配置-->
        <property name="taskTrackerCustoms">
            <list>
                <ref bean="taskTrackerCustom"/>
            </list>
        </property>
    </bean>


    <bean id="taskTrackerCustom" class="com.elin4it.common.util.taskSchedule.TaskTrackerCustom">
        <property name="jobRunnerClass" value="com.elin4it.biz.daemon.task.DepositQueryRunner"/>
        <property name="nodeGroup" value="DepositQueryRunner"/>
        <property name="workThreads" value="1"/>
    </bean>

接下来在界面上配置任务就能够运行任务了。

相关文章
相关标签/搜索