需求:作一个定时扫描的任务,每隔一段时间去扫描某个路径下的xml文件。
分析:项目启动时,就应该启动定时扫描任务,在这里用到了spring的定时器。
我这里采起继承QuartzJobBean的方式,任务类以下
#1.XmlScanImportTask.javajava
package com.scan; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; /** * Created by Administrator on 2017/3/9 0009. */ public class XmlScanImportTask extends QuartzJobBean{ private boolean runningFlag=false; @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { //指定须要执行的任务 System.out.println("-------系统-------->当前时间:"+ new Date()); if(runningFlag){ LogUtil.info("扫描xml任务正在执行中..."); return; } runningFlag = true; try { //这里实现本身的业务 }catch (Exception e){ e.printStackTrace(); LogUtil.error("扫描入库出现异常"); }finally { runningFlag = false; LogUtil.info("扫描任务执行完毕"); } } }
#2.xml配置以下
scan.xmlspring
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd"> <!-- 任务bean--> <bean id="jobDetailFactoryBean" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"> <property name="jobClass" value="com.scan.XmlScanImportTask"/> </bean> <!-- 触发bean,设置任务的调度策略--> <bean id="cronTriggerFactoryBean" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="jobDetailFactoryBean"/> <!-- 每一个60秒触发 --> <property name="cronExpression" value="0/60 * * * * ?"/> </bean> <!-- 调度工厂bean,激活触发器,启动quartz任务--> <bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <list> <ref bean="cronTriggerFactoryBean"/> </list> </property> </bean> </beans>
#3.线程池管理工具类以下数组
package com.scan; import com.zving.framework.utility.LogUtil; import java.util.Collections; import java.util.LinkedList; import java.util.List; /** * Created by Administrator on 2017/3/10 0010. */ public class ThreadPoolManager { //消息任务ID private static long _next_task_id = 0; //默认初始化线程数 public static final int DEFAULT_THREAD_NUM = 20; //默认最大并发线程数 public static final int MAX_THREAD_NUM = 20; //当前使用最大并发线程数 public int _cur_thread_num = 0; //线程池状态 public boolean _is_closed = true; //线程任务列表 public List<ThreadTask> taskQueue = Collections.synchronizedList(new LinkedList<ThreadTask>()); //线程数组集合 public WorkThread[] threads; //线程对象 private static ThreadPoolManager _instance = null; //默认构造 private ThreadPoolManager(){ _cur_thread_num = DEFAULT_THREAD_NUM; threads = new WorkThread[_cur_thread_num]; for(int i = 0; i < _cur_thread_num; ++ i){ threads[i] = new WorkThread(i); } } //自定义线程数构造 public ThreadPoolManager(int thread_num){ _cur_thread_num = thread_num; threads = new WorkThread[_cur_thread_num]; for(int i = 0; i < _cur_thread_num; ++ i){ threads[i] = new WorkThread(i); } } /* singleton */ public static ThreadPoolManager getInstance(){ if(_instance == null){ synchronized(ThreadPoolManager.class){ if(_instance == null){ _instance = new ThreadPoolManager(); } } } return _instance; } public static synchronized long generateTaskId(){ // _next_task_id += (_next_task_id + 1) / 1000000; // if(_next_task_id == 0) _next_task_id ++; return _next_task_id++; } //开启执行 public synchronized void start(){ if(!_is_closed){ LogUtil.info("线程池已经被初始化过..."); return ; } _is_closed = false; // LogUtil.info(String.format("ThreadPool Initializing----")); for(int i = 0; i < _cur_thread_num; ++ i){ threads[i].start(); // LogUtil.info(String.format("thread [%d] start!", i)); } //LogUtil.info(String.format("ThreadPool Initialized----,init "+_cur_thread_num+" thread")); } //关闭 public void close(){ if(!_is_closed){ waitforfinish(); _is_closed = true; taskQueue.clear(); } LogUtil.info("Thread pool close!"); } public void waitforfinish(){ synchronized(this){ _is_closed = true; notifyAll(); } for(int i = 0; i < _cur_thread_num; ++ i){ threads[i].stopThread(); LogUtil.info(String.format("Thread [%d] stop!", i)); } } //增长新任务 public void addTask(ThreadTask new_task){ synchronized(taskQueue){ if(_is_closed){ start(); } if(new_task != null){ taskQueue.add(new_task); taskQueue.notifyAll(); //taskQueue.notify(); } } } public int getTaskCount(){ return taskQueue.size(); } private class WorkThread extends Thread{ private int _index; private boolean _is_running = true; public WorkThread(int index){ _index = index; } public void run(){ while(_is_running){ ThreadTask t = getTask(); if(t != null){ t.run(); // System.out.println(_index); }else{ // 结束线程 LogUtil.info(String.format("thread [%d] exit", _index)); return; } } } public ThreadTask getTask(){ if(_is_closed) return null; ThreadTask r = null; synchronized (taskQueue) { while (taskQueue.isEmpty()) { try { /* 任务队列为空,则等待有新任务加入从而被唤醒 */ taskQueue.wait(); } catch (InterruptedException e) { LogUtil.error(e.getStackTrace()); } } /* 取出任务执行 */ r = (ThreadTask) taskQueue.remove(0); return r; } } public void stopThread(){ _is_running = false; try{ join(); }catch(InterruptedException ex){ LogUtil.error(ex.getStackTrace()); } } } }
#3.任务线程以下spring-mvc
package com.scan; /** * Created by Administrator on 2017/3/10 0010. */ public abstract class ThreadTask implements Runnable{ public long taskId = 0; public ThreadTask() { } public ThreadTask(long taskId) { this.taskId = taskId; } public long getTaskId() { return taskId; } public void setTaskId(long taskId) { this.taskId = ThreadPoolManager.generateTaskId(); } }
#4.实际业务任务线程以下
XmlScanImportThread.java并发
public class XmlScanImportThread extends ThreadTask{ String abfilePathTmp; Long siteIdTmp; public XmlScanImportThread(String abfilePathTmp, Long siteIdTmp) { this.abfilePathTmp = abfilePathTmp; this.siteIdTmp = siteIdTmp; } @Override public void run() { try { //实际业务 }catch (Exception e){ LogUtil.info("["+abfilePathTmp+"]解析失败"); } } }