多线程访问限流接口: html
/** * 多线程同步代码 * */ protected Integer syncXXX(List<String> erps, Date startTime, Date endTime) throws Exception { int total = 0; if(CollectionUtils.isEmpty(erps)) { return total; } //TPS(Transaction Per Second) 每秒钟系统可以处理的交易或事务的数量 int tps = SystemConfigs.getInt(SysConfigKeys.XXX_TPS); int tSize = (int) Math.ceil(tps / 5.0);// 预计一个线程每秒5次请求,计算线程数,向上取整 int slen = (int) Math.ceil(erps.size() / (tSize + 0.0));//单个线程计算的erp个数,向上取整 int stps = (int) Math.floor(tps / (tSize + 0.0)); // 预计单个线程的访问频率,向下取整 ExecutorService executorService = Executors.newFixedThreadPool(tSize);//创建ExecutorService线程池 List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); int sindex = 0; while(sindex < erps.size()) {//将erps分配给每一个线程 int limit = sindex + slen; List<String> serps = erps.subList(sindex, limit < erps.size() ? limit : erps.size());//单个线程计算的ERP list Future<Integer> future = executorService.submit(new XXXeSyncTask(serps, startTime, endTime, stps)); futureList.add(future); sindex = limit; } for(Future<Integer> future : futureList) { total += future.get(); } executorService.shutdown(); return total; } class XXXSyncTask implements Callable<Integer> { // erp子级,代码行汇总 private List<String> erps; private Date start; private Date end; // 容许最大访问频率 private int tps; public XXXSyncTask(List<String> erps, Date start, Date end, int tps) { this.erps = erps; this.start = start; this.end = end; this.tps = tps; } @Override public Integer call() throws Exception { long startMillis = System.currentTimeMillis(); long duration = 0; int count = 0; List<XXX> xxxList = new ArrayList<XXX>(); for (int i = 0; i < erps.size(); i++) { String erp = erps.get(i); while (start.before(end)) { // 预计耗时(ms) long predict = i * 1000 / tps; if (predict > duration) { // 实际耗时小于预计耗时,sleep long waitting = predict - duration; Thread.sleep(waitting); LOG.debug("代码行同步休眠{}ms. ", waitting); } duration = System.currentTimeMillis() - startMillis; List<XXX> tempList = queryData(erp, start); xxxList.addAll(tempList); if (xxxList.size() > 100) { xxxDao.batch(xxxList, SQL_OPTION.INSERT); count += xxxList.size(); xxxList.clear(); } start = DateUtils.addDay(start, 1); } } if (!xxxList.isEmpty()) { xxxDao.batch(codeColumnList, SQL_OPTION.INSERT); count += codeColumnList.size(); xxxList.clear(); } return count; } }
相关文章: java
http://www.oschina.net/translate/java-util-concurrent-future-basics 多线程
http://www.cnblogs.com/whgw/archive/2011/09/28/2194760.html ide