使用多线程机制异步执行业务方法

    开发过程当中,常常会遇到一键操做这样的功能,当数据量较少或者业务逻辑单一的时候没什么问题,可是当遇到数据量较大,并且业务逻辑较为复杂的状况,就比较棘手了,一键执行后,仿佛整个世界都在跟着转圈圈,直到请求超时,更有甚者,服务器直接驾崩。java

    最近,在帮客户作微信会员资料更新操做时就遇到这样的状况,最开始是最简单的遍历执行,发现行不通;因而又换用分页批量执行的方法,结果发现换汤不换药,仍是请求超时;最后没办发,只能经过使用多线程机制,经过开启多线程,以增长系统开销来节省请求时间,终于把问题解决了。也许会有更好的方法,可是目前实现功能要紧。话很少说,关门,放代码。。。git

    对此,我写了两套方案,其一是经过实现Callable接口异步执行业务方法,最后返回执行结果;github

package com.web.demo.thread;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 异步执行业务方法,实现Callable接口,返回执行结果
 * 
 * @author jiangyf
 */
public class AsyncTask implements Callable<List<Map<String, Object>>> {
	// 执行任务名称
	private String taskName;
	// 执行任务时间
	private long taskTime;
	// 线程同步辅助类,在完成一组正在其余线程中执行的操做以前,它容许一个或多个线程一直等待
	private CountDownLatch latch;

	// 任务执行结果
	List<Map<String, Object>> resultList;
	private Map<String, Object> resultMap;

	public AsyncTask(String taskName, long taskTime, CountDownLatch latch) {
		super();
		this.taskName = taskName;
		this.taskTime = taskTime;
		this.latch = latch;
	}

	@Override
	public List<Map<String, Object>> call() throws Exception {
		resultList = new ArrayList<Map<String, Object>>();
		resultMap = new HashMap<String, Object>();
		// 任务开始时间
		long begin = System.currentTimeMillis();

		System.out.println(taskName + " 任务开始....");
		
		// 执行具体业务
		Thread.sleep(taskTime * 1000);
		
		System.out.println(taskName + " 任务结束....");

		// 任务结束时间
		long end = System.currentTimeMillis();
		taskTime = (end - begin) / 1000;
		resultMap.put("taskName", taskName);
		resultMap.put("taskTime", taskTime);
		resultList.add(resultMap);
		System.out.println(taskName + "任务用时:" + taskTime + "秒");
		if (latch != null) {
			// 任务完成,计数器减一
			latch.countDown();
		}
		return resultList;
	}

	public static void main(String[] args) throws InterruptedException,
			ExecutionException {
		// 任务开始时间
		long begin = System.currentTimeMillis();

		// 初始化计数器
		CountDownLatch latch = new CountDownLatch(2);
		// 初始化线程池
		ExecutorService executorService = Executors.newFixedThreadPool(2);
		// 初始化线程
		Future<List<Map<String, Object>>> future = executorService
				.submit(new AsyncTask("running", 2, latch));
		Future<List<Map<String, Object>>> future2 = executorService
				.submit(new AsyncTask("walking", 5, latch));
		executorService.shutdown();
		// 所有任务执行完成前,会一直阻塞当前线程,直到计时器的值为0
		latch.await();
		List<Map<String, Object>> result = future.get();
		List<Map<String, Object>> result2 = future2.get();
		result.addAll(result2);
		System.out.println(result.size());
		
		// 任务结束时间
		long end = System.currentTimeMillis();
		System.out.println("任务总用时:" + ((end - begin) / 1000) + "秒");
	}

}

其二是经过继承Thread类异步执行业务方法,最后不返回执行结果。web

package com.web.demo.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

/**
 * 异步执行业务方法,继承Thread类,不返回执行结果
 * 
 * @author jiangyf
 */
public class AsyncJob extends Thread {
	// 执行任务名称
	private String jobName;
	// 执行任务时间
	private long jobTime;
	// 线程同步辅助类,在完成一组正在其余线程中执行的操做以前,它容许一个或多个线程一直等待
	private CountDownLatch latch;

	public AsyncJob(String jobName, long jobTime, CountDownLatch latch) {
		super();
		this.jobName = jobName;
		this.jobTime = jobTime;
		this.latch = latch;
	}

	public void run() {
		// 任务开始时间
		long begin = System.currentTimeMillis();

		System.out.println(jobName + " 任务开始....");

		// 执行具体业务
		try {
			Thread.sleep(jobTime * 1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		System.out.println(jobName + " 任务结束....");

		// 任务结束时间
		long end = System.currentTimeMillis();
		jobTime = (end - begin) / 1000;
		System.out.println(jobName + "任务用时:" + jobTime + "秒");
		if (latch != null) {
			// 任务完成,计数器减一
			latch.countDown();
		}

	}

	public static void main(String[] args) throws InterruptedException,
			ExecutionException {
		// 任务开始时间
		long begin = System.currentTimeMillis();

		// 初始化计数器
		CountDownLatch latch = new CountDownLatch(2);
		// 初始化线程
		AsyncJob job = new AsyncJob("running", 5, latch);
		AsyncJob job2 = new AsyncJob("walking", 2, latch);
		job.start();
		job2.start();
		// 所有任务执行完成前,会一直阻塞当前线程,直到计时器的值为0
		latch.await();
		
		// 任务结束时间
		long end = System.currentTimeMillis();
		System.out.println("任务总用时:" + ((end - begin) / 1000) + "秒");
	}

}

如下为业务代码实现示例:服务器

// 会员资料更新失败的卡号
	private static StringBuffer cardNoStr = new StringBuffer();

	/**
	 * 同步微信会员信息
	 */
	public String syncVipInfo(String weixinId) {
		log.info("-------------同步微信会员信息开始");
		String msg = "";
		setWeixinInfo(weixinId);
		try {
			List<VipInfo> vipInfos = vipInfoDao.getByWeixinId(weixinId, null,
					null);
			int totalRows = vipInfos.size();
			msg = "须要同步的微信会员数:" + totalRows;
			log.info(msg);
			if (totalRows == 0) {
				return msg;
			}
			Map<String, String> map = PropertiesUtil
					.propertiesToMap("syncvipinfo.properties");
			if (map.get("max_num") == null || map.get("max_thread") == null) {
				msg = "同步微信会员信息配置文件错误";
				log.info(msg);
				return msg;
			}
			int offset = 0;
			int rows = Integer.parseInt(map.get("max_num"));
			int threadNum = Integer.parseInt(map.get("max_thread"));
			int count = totalRows / rows;
			if ((totalRows % rows) > 0) {
				count += 1;
			}
			if (count > threadNum) {
				if ((totalRows % threadNum) > 0) {
					count = threadNum - 1;
					rows = totalRows / count;
					if ((totalRows % count) > 0) {
						count += 1;
					}
				} else {
					count = threadNum;
					rows = totalRows / count;
				}
			}

			log.info("须要开启线程数量:" + count);
			// 任务开始时间
			long begin = System.currentTimeMillis();

			// 初始化计数器
			CountDownLatch latch = new CountDownLatch(count);
			
			// 初始化线程池
			ExecutorService executorService = Executors
					.newFixedThreadPool(count);
			// 初始化线程
			for (int i = 0; i < count; i++) {
				Future<String> future = executorService.submit(new AsyncTask(
						latch, vipInfos, offset, rows));
				offset += rows;
			}
			executorService.shutdown();
			log.info("线程池是否已关闭:" + executorService.isShutdown());
			
			// 初始化线程
			/*
			for (int i = 0; i < count; i++) {
				AsyncJob job = new AsyncJob(latch, vipInfos, offset, rows);
				job.start();
				offset += rows;
			}
			*/
			if (count > 0) {
				// 所有任务执行完成前,会一直阻塞当前线程,直到计时器的值为0
				latch.await();
			}

			if (!"".equals(cardNoStr.toString())) {
				msg = "同步微信会员信息失败的会员卡号有:" + cardNoStr.toString().substring(0, cardNoStr.lastIndexOf(","));
			} else {
				msg = "同步微信会员信息成功";
			}
			// 任务结束时间
			long end = System.currentTimeMillis();
			log.info("同步微信会员信息任务用时:" + ((end - begin) / 1000) + "秒");
		} catch (Exception e) {
			msg = "同步微信会员信息出现异常";
			log.error(msg + e.getMessage());
		}
		log.info("执行结果-------" + msg + "-------");
		log.info("-------------同步微信会员信息结束");
		return msg;
	}

	/**
	 * 异步执行业务方法,实现Callable接口,返回执行结果
	 * 
	 * @author jiangyf
	 */
	static class AsyncTask implements Callable<String> {
		private Logger log = LoggerFactory.getLogger(WXWebService.class);
		// 线程同步辅助类,在完成一组正在其余线程中执行的操做以前,它容许一个或多个线程一直等待
		private CountDownLatch latch;
		private List<VipInfo> vipInfos;
		private int offset;
		private int rows;

		public AsyncTask(CountDownLatch latch, List<VipInfo> vipInfos,
				int offset, int rows) {
			super();
			this.latch = latch;
			this.vipInfos = vipInfos;
			this.offset = offset;
			this.rows = rows;
		}

		public String call() {
			// 任务开始时间
			long begin = System.currentTimeMillis();

			log.info("当前线程更新会员数量:------" + vipInfos.size());

			// 执行具体业务
			for (int i = 0; i < rows; i++) {
				int num = offset + i;
				if (num >= vipInfos.size()) {
					break;
				}
				VipInfo vipInfo = vipInfos.get(num);
				if (vipInfo != null) {
					String cardNo = vipInfo.getCardNo();
					WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo
							.getOpenId());
					try {
						updateVipInfo(vipInfo, userInfo);
						log.info("会员卡号为" + cardNo + "的会员资料更新成功");
					} catch (SQLException e) {
						cardNoStr.append(cardNo + ",");
						log.info("会员卡号为" + cardNo + "的会员资料更新失败");
					}
				}
			}

			// 任务结束时间
			long end = System.currentTimeMillis();
			log.info("当前线程执行任务用时:" + ((end - begin) / 1000) + "秒");
			if (latch != null) {
				latch.countDown();// 任务完成,计数器减一
			}
			return cardNoStr.toString();
		}

	}

	static class AsyncJob extends Thread {
		private Logger log = LoggerFactory.getLogger(WXWebService.class);
		// 线程同步辅助类,在完成一组正在其余线程中执行的操做以前,它容许一个或多个线程一直等待
		private CountDownLatch latch;
		private List<VipInfo> vipInfos;
		private int offset;
		private int rows;

		public AsyncJob(CountDownLatch latch, List<VipInfo> vipInfos,
				int offset, int rows) {
			super();
			this.latch = latch;
			this.vipInfos = vipInfos;
			this.offset = offset;
			this.rows = rows;
		}

		public void run() {
			// 任务开始时间
			long begin = System.currentTimeMillis();

			log.info("当前线程更新会员数量:------" + vipInfos.size());

			// 执行具体业务
			for (int i = 0; i < rows; i++) {
				int num = offset + i;
				if (num >= vipInfos.size()) {
					break;
				}
				VipInfo vipInfo = vipInfos.get(num);
				if (vipInfo != null) {
					String cardNo = vipInfo.getCardNo();
					WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo
							.getOpenId());
					try {
						updateVipInfo(vipInfo, userInfo);
						log.info("会员卡号为" + cardNo + "的会员资料更新成功");
					} catch (SQLException e) {
						cardNoStr.append(cardNo + ",");
						log.info("会员卡号为" + cardNo + "的会员资料更新失败");
					}
				}
			}

			// 任务结束时间
			long end = System.currentTimeMillis();
			log.info("当前线程执行任务用时:" + ((end - begin) / 1000) + "秒");
			if (latch != null) {
				latch.countDown();// 任务完成,计数器减一
			}

		}
	}

 

代码地址:https://github.com/github-jade/myweb/tree/develop/src/main/java/com/web/demo/thread微信

相关文章
相关标签/搜索