(1)总论java
1.能够不用多线程最好不要用算法
2.若是能够不共享数据最好不要共享spring
3.服务器端最佳线程数量=((线程等待时间+线程cpu时间)/线程cpu时间) * cpu数量数据库
由于数据库访问等待形成线程等待时间长比较长见,下面的例子就是以数据库数据迁徙程序说明。编程
经常使用模式服务器
(2)分几个线程处理不一样数据多线程
适用场景:数据能够容易的分开处理ide
1 package me.jdk.thread; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 10 /** 11 * 多线程不一样数据 12 * 适用场景:数据能够容易的分开处理 13 * @author guanpanpan 14 * 15 */ 16 public class MulThrDiffData { 17 protected final static Logger log = LoggerFactory.getLogger(MulThrDiffData.class); 18 private static CountDownLatch latch; 19 20 public static void main(String[] args) { 21 int dbMax = 10; 22 int tableMax = 16; 23 //设置同时处理的线程数 24 ExecutorService executorService = Executors.newFixedThreadPool(40); 25 //设置等待计数器 26 latch = new CountDownLatch(dbMax * tableMax); 27 // 启动线程 28 for (int tableNo = 1; tableNo <= tableMax; tableNo++) { 29 for (int dbNo = 1; dbNo <= dbMax; dbNo++) { 30 executorService.execute(new UserinfoRunable(dbNo, tableNo)); 31 } 32 } 33 //再也不接受新线程 34 executorService.shutdown(); 35 //等待程序执行完 36 try { 37 latch.await(); 38 } catch (InterruptedException e1) { 39 log.error("threadLatch.await()", e1); 40 } 41 log.info("main End"); 42 } 43 44 /** 45 * 处理用户数据的线程 46 */ 47 static class UserinfoRunable implements Runnable { 48 int dbNo; 49 int tableNo; 50 51 public UserinfoRunable(int dbNo, int tableNo) { 52 this.dbNo = dbNo; 53 this.tableNo = tableNo; 54 } 55 56 @Override 57 public void run() { 58 System.out.println("do something" + dbNo + "-" + tableNo); 59 try { 60 Thread.sleep(1000); 61 } catch (InterruptedException e) { 62 // TODO Auto-generated catch block 63 e.printStackTrace(); 64 } 65 //计数器减去一 66 latch.countDown(); 67 } 68 } 69 }
(3)线程协做来处理同一批数据memcached
适用场景:数据处理是一个相似生产线状况,每一个生产过程费时不一样单元测试
1 package me.jdk.thread; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.BlockingQueue; 6 import java.util.concurrent.CountDownLatch; 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 import java.util.concurrent.LinkedBlockingQueue; 10 11 import me.util.BlockingQueueUtil; 12 import me.util.DateUtil; 13 14 import org.slf4j.Logger; 15 import org.slf4j.LoggerFactory; 16 import org.springframework.util.CollectionUtils; 17 18 /** 19 * 线程协做来处理同一批数据 20 * 本示例是针对只跑一次的job,若是须要一直在跑的woker,只须要作少量改动 21 * @author guanpanpan 22 * 23 */ 24 public class ProductConsumeTh { 25 protected final static Logger log = LoggerFactory.getLogger(ProductConsumeTh.class); 26 public static int productThSize = 2;//生产者线程数 27 public static int consumeThSize = 5;//消费者线程数 28 public static int maxDealSize = 10;//单个消费者线程每次最多处理数量 29 public static int maxQueueSize = 1000;//生产者队列最多数量 30 //用String做为示例,实际使用换成实际类型 31 public static BlockingQueue<String> consumeQueue = new LinkedBlockingQueue<String>(maxQueueSize); 32 private static CountDownLatch productLatch;//生产者拦截计数 33 private static CountDownLatch consumeLatch;//消费者拦截计数 34 35 private static boolean productRun;//生产者是否在生产 36 37 public static void main(String[] args) { 38 productRun = true; 39 ExecutorService executorService = Executors.newFixedThreadPool(consumeThSize + productThSize); 40 consumeLatch = new CountDownLatch(consumeThSize); 41 productLatch = new CountDownLatch(productThSize); 42 //开启读线程 43 for (int thNo = 1; thNo <= productThSize; thNo++) { 44 executorService.execute(new ProductRunable(thNo)); 45 } 46 DateUtil.sleepForOneSecond();//先读一会 47 //开启写线程 48 for (int thNo = 1; thNo <= consumeThSize; thNo++) { 49 executorService.execute(new ConsumeRunable(thNo)); 50 } 51 executorService.shutdown(); 52 // 等待写线程完 53 try { 54 productLatch.await(); 55 } catch (InterruptedException e) { 56 log.error("error in getLatch", e); 57 } 58 productRun = false; 59 // 等待写线程完 60 try { 61 consumeLatch.await(); 62 } catch (InterruptedException e) { 63 log.error("error in writeLatch", e); 64 } 65 System.out.println("main End"); 66 } 67 68 static class ProductRunable implements Runnable { 69 private int thNo; 70 71 public ProductRunable(int thNo) { 72 this.thNo = thNo; 73 74 } 75 76 @Override 77 public void run() { 78 for (int i = 0; i < 10; i++) { 79 //取数据,由于是示例因此直接内存构建 80 List<String> list = new ArrayList<String>(); 81 list.add("th" + thNo + " a" + i); 82 //加入队列 83 try { 84 BlockingQueueUtil.put(consumeQueue, list); 85 } catch (InterruptedException e) { 86 log.error("QueueUtil.put:", e); 87 } 88 //实际中能够若是取不到数据就休息一会,或者退出本线程,视程序是一直在跑的work,仍是只跑一次的job 89 System.out.println("geter " + thNo + " put" + list); 90 DateUtil.sleepForOneSecond(); 91 } 92 System.out.println("geter " + thNo + " end"); 93 productLatch.countDown(); 94 } 95 96 } 97 98 static class ConsumeRunable implements Runnable { 99 private int thNo; 100 101 public ConsumeRunable(int thNo) { 102 this.thNo = thNo; 103 } 104 105 @Override 106 public void run() { 107 int dealSize = 0; 108 //只有在生产者中止生产时,而且处理完全部数据才会退出 109 while (productRun || dealSize > 0) { 110 //获得当前线程要处理的数据 111 List<String> sourceDatas = new ArrayList<String>(); 112 consumeQueue.drainTo(sourceDatas, maxDealSize); 113 //进行处理 114 if (CollectionUtils.isEmpty(sourceDatas)) { 115 dealSize = 0; 116 continue; 117 } 118 dealSize = sourceDatas.size(); 119 try { 120 System.out.println("writer " + thNo + " deal:" + sourceDatas); 121 } catch (Exception e) { 122 e.printStackTrace(); 123 log.error("dtData", e); 124 } 125 //本处可考虑无数据处理时sleep休息一会 126 127 } 128 System.out.println("writer " + thNo + " end"); 129 consumeLatch.countDown(); 130 } 131 } 132 }
1 package me.util; 2 3 import java.util.Collection; 4 import java.util.concurrent.BlockingQueue; 5 6 /** 7 * 阻塞队列 8 * @author guanpanpan 9 * 10 */ 11 public class BlockingQueueUtil { 12 /** 13 * 向队列加入一组数据,若是对队已满会阻塞 14 */ 15 public static <T> void put(BlockingQueue<T> blockingQueue, Collection<T> collection) throws InterruptedException { 16 for (T object : collection) { 17 blockingQueue.put(object); 18 } 19 } 20 21 /** 22 * 向队列加入单个数据,若是对队已满会阻塞 23 */ 24 public static <T> void put(BlockingQueue<T> blockingQueue, T object) throws InterruptedException { 25 blockingQueue.put(object); 26 } 27 }
(3)使用取模来实现多线程处理不一样数据
下面代码顺带演示了下,线程编程和单元测试的一些关系,有时须要为单元测试改变一些原代码
1 package me.jdk.thread; 2 3 import java.util.ArrayList; 4 import java.util.Collection; 5 import java.util.List; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 9 import me.util.DateUtil; 10 import me.util.ModeUtil; 11 12 import org.slf4j.Logger; 13 import org.slf4j.LoggerFactory; 14 15 /** 16 * 多线程处理同一批数据 17 * 好处:不一样线程处理固定数据,不会有重复取的问题 18 * @author guanpanpan 19 * 20 */ 21 public class MulThreadSameData_Mod { 22 protected final static Logger log = LoggerFactory.getLogger(MulThreadSameData_Mod.class); 23 //若是想多个线程拥有启动,中止等操做就持有它 24 private static List<DataDealRunable> jobRunables = new ArrayList<DataDealRunable>(); 25 private static int threadSize = 10; 26 27 public static void main(String[] args) { 28 29 //初始化相应线程 30 for (int thNo = 1; thNo <= threadSize; thNo++) { 31 jobRunables.add(new DataDealRunable(thNo)); 32 } 33 //启动相应服务线程 34 ExecutorService executorService = Executors.newFixedThreadPool(threadSize); 35 for (DataDealRunable jobRunable : jobRunables) { 36 executorService.execute(jobRunable); 37 } 38 executorService.shutdown(); 39 //在单元测试时可使用,在本处只为展现,移植到例子以前是没有的 40 sleepStopAndWait(); 41 log.info("main End"); 42 } 43 44 static class DataDealRunable implements Runnable { 45 protected boolean run = true;//控制执行 46 public boolean runing = true;//执行状态,用于单元测试 47 protected int threadId; 48 49 public DataDealRunable(int threadId) { 50 this.threadId = threadId; 51 } 52 53 public void stop() { 54 this.run = false; 55 } 56 57 @Override 58 public void run() { 59 runing = true; 60 while (run) { 61 //示意从数据库取到数据 62 List<String> datas = getDatasFromDb(); 63 //获得当前线程要处理的数据 64 Collection<String> datasToDeal = getDealData(datas, threadSize, threadId); 65 System.out.println("th" + threadId + "do something" + datasToDeal.size()); 66 DateUtil.sleepForOneSecond(); 67 } 68 runing = false; 69 } 70 } 71 72 public static void stopService() { 73 for (DataDealRunable jobRunable : jobRunables) { 74 jobRunable.stop(); 75 } 76 77 } 78 79 public static void sleepStopAndWait() { 80 //先让执行一会,使线程获得执行 81 DateUtil.sleepForOneSecond(); 82 //中止线程 83 stopService(); 84 //等待线程结束,只因此未使用latch来作,是由于本程序是一直在跑的worker,只有在集成测试时才须要中止 85 waitStop(); 86 } 87 88 /** 89 * 在stopService后调用,等待线程退出,用于测试 90 */ 91 private static void waitStop() { 92 boolean running = true; 93 while (running) { 94 running = false; 95 for (DataDealRunable jobRunable : jobRunables) { 96 if (jobRunable.runing) { 97 running = true; 98 } 99 } 100 if (running) { 101 DateUtil.sleepForOneSecond(); 102 } 103 104 } 105 106 } 107 108 public static Collection<String> getDealData(Collection<String> datas, int modeSize, int modeNo) { 109 Collection<String> modedatas = new ArrayList<String>(); 110 for (String data : datas) { 111 if (modeNo == ModeUtil.getRandMode(data, modeSize)) { 112 modedatas.add(data); 113 } 114 } 115 return modedatas; 116 } 117 118 /** 119 * 模拟从数据库取得数据 120 */ 121 private static List<String> getDatasFromDb() { 122 /**要处理的数据*/ 123 List<String> datas = new ArrayList<String>(); 124 for (int i = 0; i < 1000; i++) { 125 datas.add("data" + i); 126 } 127 return datas; 128 } 129 }
1 package me.util; 2 3 import me.arithmetic.hash.HashAlgorithm; 4 5 /** 6 * 分库分表算法 7 * 计算库和表的hash种子不能同样,避免分库和分表奇偶问题形成的不平均 8 * @author guanpanpan 9 * 10 */ 11 public class ModeUtil { 12 /** 13 * hash算法的实现类,采用的是memcached的实现类。 14 */ 15 private static HashAlgorithm ketemaHash = HashAlgorithm.KETAMA_HASH; 16 17 /** 18 * 计算一个字符串的hash码,原理为先计算md5码,再计算md5码得hash。 这种算法能保证hash数据的均匀分布 19 * 在计算hash前,传入的Pin会被转成小写 20 * 21 * @param pin 22 * 用户登录名(也能够是其余字符串) 23 * @return long型的hashcode 24 */ 25 public static long getHash(String pin) { 26 String lcasePin = pin.toLowerCase().trim(); 27 long hashCode = Math.abs(ketemaHash.hash(lcasePin)); 28 return hashCode; 29 } 30 31 public static int getRandMode(String pin, int modeSize) { 32 return (int) (getHash(pin) % modeSize + 1); 33 } 34 35 }