在处理业务的时候,有时候须要根据状况使用不一样的线程处理模型来处理业务逻辑,这里演示一下常见的线程模型使用技巧。html
一、Future模型java
前面的章节中提到过Future模型,该模型一般在使用的时候须要结合Callable接口配合使用。Future:将来的、未来的,再结合Callable大概能够明白其功能。缓存
Future是把结果放在未来获取,当前主线程并不急于获取处理结果。容许子线程先进行处理一段时间,处理结束以后就把结果保存下来,当主线程须要使用的时候再向子线程索取。多线程
Callable是相似于Runnable的接口,其中call方法相似于run方法,所不一样的是run方法不能抛出受检异常没有返回值,而call方法则能够抛出受检异常并可设置返回值。二者的方法体都是线程执行体。框架
/** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception;
注意这里,没法抛出受检异常不等于没法捕获线程中throws的异常。run方法执行体中抛出异常是能够被捕获的,前提是使用Future来处理,后面会有说明。dom
若是有一种场景须要一个线程处理一段业务,处理结束以后主线程将会使用处理结果进行后续处理。这样,按照普通逻辑,就须要使用到一个全局变量来保存子线程处理以后的结果。子线程处理结束以后,把结果保存在全局变量中供主线程进行调用。一旦涉及到全局能量便存在着多线程读写全局变量错误的风险。而使用Future模式即可以省去全局变量的使用,直接从线程中获取子线程处理结果。下面看一下使用示例;ide
package thread.blogs.threadmodel; /** * Created by PerkinsZhu on 2017/9/1 15:34. */ public class AbstractModel { protected static void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } protected static void println(Object info) { System.out.println(info); } }
package thread.blogs.threadmodel; import java.util.concurrent.*; /** * Created by PerkinsZhu on 2017/9/1 15:32. */ public class FutureModel extends AbstractModel { public static void main(String[] args) { testFuture(); } /** * 区别: CallAble 能够有返回值 能够抛出受检异常 * Runnable 没有返回值 没法抛出受检异常但可捕获线程中发生的异常。 * 者均可经过对future.get()进行try cathch捕获异常 */ private static void testFuture() { MyCallable myCallable = new MyCallable(); MyRunnable myRunnable = new MyRunnable(); ExecutorService executorService = Executors.newFixedThreadPool(5); Future<?> future = executorService.submit(myCallable); sleep(2000); try { //String data = future.get(2000, TimeUnit.MILLISECONDS);//能够指定超时时间 Object data = future.get();//当执行Runnable的时候,这里返回的为nul。此时若是有run方法体中有异常异常抛出,能够在此捕获到,虽然Run方法没有显示的抛出受检异常。 println(data + "---" + data.getClass().toString()); } catch (InterruptedException e) { println(e.getMessage()); } catch (ExecutionException e) { println(e.getMessage()); } catch (Exception e) { println(e.getMessage()); } executorService.shutdown(); } static class MyCallable implements Callable<String> { @Override public String call() throws Exception { sleep(500); println("I am Callable..."); //int num = 10/0; //throw new RuntimeException("异常"); return "hello"; } } static class MyRunnable implements Runnable { @Override public void run() {//不支持返回值,没法对线程捕获异常。 sleep(500); println("I am Runnable..."); // int num = 10/0; //throw new RuntimeException("异常"); } } }
能够取消注释 分别测试 myCallable 和myRunnable 对异常捕获和结果获取进行测试。post
二、fork&join 模型
该模型是jdk中提供的线程模型。该模型包含递归思想和回溯思想,递归用来拆分任务,回溯用合并结果。 能够用来处理一些能够进行拆分的大任务。其主要是把一个大任务逐级拆分为多个子任务,而后分别在子线程中执行,当每一个子线程执行结束以后逐级回溯,返回结果进行汇总合并,最终得出想要的结果。这里模拟一个摘苹果的场景:有100棵苹果树,每棵苹果树有10个苹果,如今要把他们摘下来。为了节约时间,规定每一个线程最多只能摘10棵苹树以便于节约时间。各个线程摘完以后汇总计算总苹果树。代码实现以下:测试
package thread.blogs.threadmodel; import scala.Console; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; /** * Created by PerkinsZhu on 2017/9/5 13:05. */ public class ForkJoin { public static void main(String[] args) { testAcation(); } private static void testAcation() { ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> future = pool.submit(new ResultTask(100));//共100棵苹果树 try { Console.println(future.get()); pool.awaitTermination(1000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } pool.shutdown(); } } class ResultTask extends RecursiveTask<Integer> { //也可继承自RecursiveAction抽象类,区别在于compute方法没有返回值,若是只须要执行动做则可使用该接口 private int treeNum; public ResultTask(int num) { this.treeNum = num; } @Override protected Integer compute() { if (treeNum < 10) {//每一个线程最多只能摘10棵苹果树 return getAppleNum(treeNum); } else { //对任务进行拆分,注意这里不只仅能够一分为二进行拆分,也能够拆为多个子任务 int temp = treeNum / 2; ResultTask left = new ResultTask(temp); ResultTask right = new ResultTask(treeNum - temp); left.fork(); right.fork(); //对子任务处理的结果进行合并 int result = left.join() + right.join(); Console.println("========" + Thread.currentThread().getName() + "=========" + result); return result; } } public Integer getAppleNum(int treeNum) { return treeNum * 10;//每棵树上10个苹果 } }
这里须要看一下执行结果,主要是为了明白在拆分子任务的时候并非无限制开启线程,而是使用了线程池ForkJoinPool复用线程。注意下面输出的线程名称!ui
========ForkJoinPool-1-worker-3=========120 ========ForkJoinPool-1-worker-7=========120 ========ForkJoinPool-1-worker-0=========120 ========ForkJoinPool-1-worker-5=========120 ========ForkJoinPool-1-worker-1=========130 ========ForkJoinPool-1-worker-11=========130 ========ForkJoinPool-1-worker-4=========250 ========ForkJoinPool-1-worker-7=========130 ========ForkJoinPool-1-worker-7=========250 ========ForkJoinPool-1-worker-3=========130 ========ForkJoinPool-1-worker-5=========250 ========ForkJoinPool-1-worker-6=========250 ========ForkJoinPool-1-worker-2=========500 ========ForkJoinPool-1-worker-3=========500 ========ForkJoinPool-1-worker-1=========1000 1000
三、actor消息模型
actor模型属于一种基于消息传递机制并行任务处理思想,它以消息的形式来进行线程间数据传输,避免了全局变量的使用,进而避免了数据同步错误的隐患。actor在接受到消息以后能够本身进行处理,也能够继续传递(分发)给其它actor进行处理。在使用actor模型的时候须要使用第三方Akka提供的框架点击查看。这里使用scala进行演示,若是须要看java使用方法则能够查阅官方文档:actor for java 使用。
package thread.blogs.threadmodel import akka.actor.{Actor, ActorSystem, Props} /** * Created by PerkinsZhu on 2017/9/21 18:58. */ object ActorTest { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("MyActor") val actor = actorSystem.actorOf(Props[MyActor], "MyActor") actor ! "很高兴认识你!"//发送消息给actor } } class MyActor extends Actor { override def receive: Receive = {//接收消息,根据消息类型进行case匹配,能够在此actor进行处理,也能够继续传递给其它actor进行处理(参考master-worker)。 case str: String => println(str) } }
四、生产者消费者模型
生产者消费者模型都比较熟悉,其核心是使用一个缓存来保存任务。开启一个/多个线程来生产任务,而后再开启一个/多个来从缓存中取出任务进行处理。这样的好处是任务的生成和处理分隔开,生产者不须要处理任务,只负责向生成任务而后保存到缓存。而消费者只须要从缓存中取出任务进行处理。使用的时候能够根据任务的生成状况和处理状况开启不一样的线程来处理。好比,生成的任务速度较快,那么就能够灵活的多开启几个消费者线程进行处理,这样就能够避免任务的处理响应缓慢的问题。使用示例以下:
package thread.blogs.threadmodel; import java.util.LinkedList; import java.util.Queue; import java.util.UUID; /** * Created by PerkinsZhu on 2017/9/22 8:58. */ public class PCModel { public static void main(String[] args) { testPCModel(); } private static Queue<String> queue = new LinkedList<String>();//任务缓存,这里保存简单的字符串模拟任务 private static void testPCModel() { new Thread(() -> {//生产者线程 while (true) { String uuid = UUID.randomUUID().toString(); queue.add(uuid); sleep(100); } }).start(); for (int i = 0; i < 10; i++) {//开启10消费者处理任务,保证生产者产生的任务可以被及时处理 new Thread(() -> { while (true) { doWork(queue); } }).start(); } } private static void doWork(Queue<String> queue) { sleep(1000); synchronized (queue) { if (queue.size() > 0) { sleep(10); System.out.println(queue.poll() + "----" + queue.size()); } } } private static void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
五、master-worker模型
master-worker模型相似于任务分发策略,开启一个master线程接收任务,而后在master中根据任务的具体状况进行分发给其它worker子线程,而后由子线程处理任务。如需返回结果,则worker处理结束以后把处理结果返回给master。下面的代码示例是使用akka actor for scala演示。使用的时候也可使用java Thread来实现该模型。
package thread.blogs.threadmodel import akka.actor.{Actor, ActorSystem, Props} /** * Created by PerkinsZhu on 2017/9/21 18:58. */ object ActorTest { val actorSystem = ActorSystem("Master") def main(args: Array[String]): Unit = { val actor = actorSystem.actorOf(Props[Master], "Master") var taskNum = 0; while (true) { taskNum = taskNum + 1; actor ! Task("作做业! --" + taskNum) //发送消息给actor Thread.sleep(100) } } } class Master extends Actor { val actorSystem = ActorSystem("worker") var num = 0; override def receive: Receive = { case task: Task => { num = num + 1; //接收到任务以后分发给其它worker线程。可使用worker池 复用actor actorSystem.actorOf(Props[Worker], "worker" + num) ! task } case any: Any => println(any) } } class Worker extends Actor { def doWork(task: Task): Unit = println(task.name) override def receive: Receive = { case task: Task => doWork(task) //worker处理接受到的任务 case any: Any => println(any) } } case class Task(name: String)
这里若是须要worker返回处理结果,则只须要在worker中调用sender 发送处理结果便可。
=========================================
原文连接:多线程(八)经常使用的线程模型转载请注明出处!
=========================================
---end