并行设计模式属于设计优化的一部分,它是对一些经常使用的多线程结构的总结和抽象。与串行程序相比,并行程序的结构一般更为复杂,所以合理的使用并行模式在多线程开发中更具备意义,在这里主要介绍==Future==、==Master-Worker==和==生产者-消费者==模型数据库
Future模式有点相似于商品订单。好比在网购时,当看中某一件商品时,就能够提交订单,当订单处理完成后,在家等待商品送货上门便可。或者说更形象的,咱们发送Ajax请求的时候,页面是异步的进行后台处理,用户无需一直等待请求的结果,能够继续浏览或操做其余内容。
设计模式
public class Main { public static void main(String[] args) { FutureClient futureClient = new FutureClient(); Date date = futureClient.request("date"); System.out.println("请求已经被处理..."); System.out.println("去作其余操做..."); System.out.println("结果为:" + date.getRequest()); } } public class FutureClient { public Date request(final String queryStr) { //1.想要一个代理对象(Date接口的实现类)先返回给发送请求的客户端,告诉她请求已经被接收到,能够作其余事情 final FutureDate futureDate = new FutureDate(); //2.启动一个新的线程,去加载真实数据,传递给这个代理对象 new Thread(new Runnable() { @Override public void run() { //3.这个新的线程能够去加载真实对象,而后传递给代理对象 RealDate realDate = new RealDate(queryStr); futureDate.setRealDate(realDate); } }).start();; return futureDate; } } public interface Date { String getRequest(); } public class FutureDate implements Date{ private RealDate realDate; private Boolean isReady = false; @Override public synchronized String getRequest() { while (!isReady) { try { //若是没有装载完毕,程序一直处于阻塞状态 wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //装载好直接获取数据 return this.realDate.getRequest(); } public synchronized void setRealDate(RealDate realDate) { while (isReady) { //若是已经加载完毕,就直接返回 return; } //若是没有,就进行装载真实对象 this.realDate = realDate; this.isReady = true; //通知 notify(); } } public class RealDate implements Date{ private String realDate; public RealDate(String realDate) { System.out.println("根据" + realDate + "进行查询,这是一个很耗时的操做..."); try { Thread.sleep(5000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("操做完毕,获取结果..."); this.realDate = "查询结果"; } @Override public String getRequest() { // TODO Auto-generated method stub return this.realDate; } }
运行结果:
请求已经被处理...
去作其余操做...
根据date进行查询,这是一个很耗时的操做...
操做完毕,获取结果...
结果为:查询结果缓存
Master-Worker模式是经常使用的并行设计模式。它的核心思想是系统由两类进程协做工做:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当各个Worker进程处理完成后,会将结果返回给Master,由Master作概括和总结。其好处是能将一个大任务分解成若干个小任务,并行执行,从而提升系统的吞吐量。
多线程
public class Task { private int id; private String name; private int price; public Task(int id, String name, int price) { super(); this.id = id; this.name = name; this.price = price; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } } public class Master { //1.有一个承装任务的集合ConcurrentLinkedQueue private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<>(); //2.使用普通的HashMap承装全部的Worker对象 private HashMap<String, Thread> workers = new HashMap<>(); //3.使用一个容器承装每个Worker并发执行任务的结果集 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<>(); //4.构造方法 public Master(Worker worker, int workerCount) { //每个Worker对象都须要有Master的引用workQueue用于任务的领取,resultMap用于任务的提交 worker.setWorkerQueue(this.workQueue); worker.setResultMap(this.resultMap); for (int i = 1; i <= workerCount; i++) { //key表示每一个Worker的名字,value表示线程执行对象 this.workers.put("子节点" + Integer.toString(i), new Thread(worker)); } } //5.提交方法 public void submit(Task task) { this.workQueue.add(task); } //6.须要执行方法,让全部Worker工做 public void execute() { for(Map.Entry<String,Thread> entry : workers.entrySet()) { System.out.println("Worker:" + entry.getKey() + "开始执行..."); entry.getValue().start(); } } //7.判断线程是否已经执行完毕 public boolean isComplete() { for(Map.Entry<String,Thread> entry : workers.entrySet()) { if(entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } //8.返回结果集数据 public int getResult() { int ret = 0; for(Map.Entry<String,Object> entry : resultMap.entrySet()) { ret += (Integer)entry.getValue(); } return ret; } } public class Worker implements Runnable{ private ConcurrentLinkedQueue<Task> workQueue; private ConcurrentHashMap<String, Object> resultMap; public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) { this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } @Override public void run() { while(true) { Task input = this.workQueue.poll(); if(input == null) break; //handle真正处理业务的方法 Object ouput = hanle(input); this.resultMap.put(Integer.toString(input.getId()), ouput); } } private Object hanle(Task input) { Object output = null; try { //表示处理业务的耗时,多是数据的加工也多是操做数据库 Thread.sleep(500); output = input.getPrice(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return output; } } public class Main { public static void main(String[] args) { //Master master = new Master(new Worker(), Runtime.getRuntime().availableProcessors()); //当前及其可用线程数 Master master = new Master(new Worker(), 20); Random price = new Random(); for (int i = 0; i < 100 ; i++) { Task t = new Task(i, "任务" + i, price.nextInt(1000)); master.submit(t); } master.execute(); long start = System.currentTimeMillis(); while(true) { if(master.isComplete()) { long end = System.currentTimeMillis(); int result = master.getResult(); System.out.println("最终结果:" + result + ",耗时:" + (end - start)); break; } } } }
运行结果:
Worker:子节点8开始执行...
Worker:子节点7开始执行...
Worker:子节点9开始执行...
Worker:子节点16开始执行...
Worker:子节点17开始执行...
Worker:子节点2开始执行...
Worker:子节点18开始执行...
Worker:子节点1开始执行...
Worker:子节点19开始执行...
Worker:子节点4开始执行...
Worker:子节点12开始执行...
Worker:子节点3开始执行...
Worker:子节点13开始执行...
Worker:子节点6开始执行...
Worker:子节点14开始执行...
Worker:子节点5开始执行...
Worker:子节点15开始执行...
Worker:子节点20开始执行...
Worker:子节点10开始执行...
Worker:子节点11开始执行...
最终结果:50179,耗时:2505并发
生产者和消费者也是一个很是经典的多线程模式,咱们在实际开发中应用很是普遍的思想理念。在生产-消费者模式中:一般由两类线程,即若干个生产者的线程和若干个消费者的线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间经过共享内存缓存区进行通讯。
MQ(Message Queue)消息队列中间件使用了生产者-消费者模式dom
public class Data { private String id; private String data; public Data(String id, String data) { super(); this.id = id; this.data = data; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getData() { return data; } public void setData(String data) { this.data = data; } } public class Provider implements Runnable{ private LinkedBlockingQueue<Data> queue; private AtomicInteger count = new AtomicInteger(0); private volatile boolean isRunning = true; private Random random = new Random(); public Provider(LinkedBlockingQueue<Data> queue) { super(); this.queue = queue; } @Override public void run() { while(this.isRunning) { try { Thread.sleep(random.nextInt(1000)); int id = count.incrementAndGet(); Data data = new Data(Integer.toString(id), "数据" + id); System.out.println("当前生产线程:" + Thread.currentThread().getName() + ",获取了数据,id为:" + id + ",进行装载到公共缓冲区..."); if(!this.queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println("提交缓冲区失败..."); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void stop() { this.isRunning = false; } } public class Consumer implements Runnable{ private LinkedBlockingQueue<Data> queue; public Consumer(LinkedBlockingQueue<Data> queue) { super(); this.queue = queue; } @Override public void run() { while(true) { try { Data data = this.queue.take(); Thread.sleep(1000); System.out.println("当前消费线程为:" + Thread.currentThread().getName() + ",消费成功,消费数据为id:" + data.getId()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public class Main { public static void main(String[] args) { //内存缓冲区 LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<>(); //生产者 Provider p1 = new Provider(queue); Provider p2 = new Provider(queue); Provider p3 = new Provider(queue); //消费者 Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); //建立线程池运行,这是一个缓存的线程池,能够建立无穷大的线程,没有任务的时候不建立线程,空闲线程存活时间为60s(默认) ExecutorService cachePool = Executors.newCachedThreadPool(); cachePool.execute(p1); cachePool.execute(p2); cachePool.execute(p3); cachePool.execute(c1); cachePool.execute(c2); cachePool.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } p1.stop(); p2.stop(); p3.stop(); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //cachePool.shutdown(); } }
运行结果:
当前生产线程:pool-1-thread-3,获取了数据,id为:1,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-1,获取了数据,id为:1,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-3,获取了数据,id为:2,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-2,获取了数据,id为:1,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-1,获取了数据,id为:2,进行装载到公共缓冲区...
当前消费线程为:pool-1-thread-4,消费成功,消费数据为id:1
当前消费线程为:pool-1-thread-6,消费成功,消费数据为id:1
当前生产线程:pool-1-thread-1,获取了数据,id为:3,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-2,获取了数据,id为:2,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-3,获取了数据,id为:3,进行装载到公共缓冲区...
当前消费线程为:pool-1-thread-5,消费成功,消费数据为id:2
当前消费线程为:pool-1-thread-4,消费成功,消费数据为id:1
当前消费线程为:pool-1-thread-6,消费成功,消费数据为id:2
当前生产线程:pool-1-thread-1,获取了数据,id为:4,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-2,获取了数据,id为:3,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-3,获取了数据,id为:4,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-3,获取了数据,id为:5,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-1,获取了数据,id为:5,进行装载到公共缓冲区...
当前消费线程为:pool-1-thread-5,消费成功,消费数据为id:3
当前生产线程:pool-1-thread-3,获取了数据,id为:6,进行装载到公共缓冲区...
当前生产线程:pool-1-thread-2,获取了数据,id为:4,进行装载到公共缓冲区...
当前消费线程为:pool-1-thread-4,消费成功,消费数据为id:2
当前消费线程为:pool-1-thread-6,消费成功,消费数据为id:3
当前生产线程:pool-1-thread-1,获取了数据,id为:6,进行装载到公共缓冲区...
当前消费线程为:pool-1-thread-5,消费成功,消费数据为id:4
当前消费线程为:pool-1-thread-4,消费成功,消费数据为id:3
当前消费线程为:pool-1-thread-6,消费成功,消费数据为id:4
当前消费线程为:pool-1-thread-5,消费成功,消费数据为id:5
当前消费线程为:pool-1-thread-4,消费成功,消费数据为id:5
当前消费线程为:pool-1-thread-6,消费成功,消费数据为id:6
当前消费线程为:pool-1-thread-5,消费成功,消费数据为id:4
当前消费线程为:pool-1-thread-4,消费成功,消费数据为id:6异步