<font size=3 face="黑体" > 疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -17【 博客园 总入口 】</font >html
[toc] 源码IDEA工程获取连接:Java 聊天室 实战 源码 java
你们好,我是做者尼恩。 目前和几个小伙伴一块儿,组织了一个高并发的实战社群【疯狂创客圈】。正在开始 高并发、亿级流程的 IM 聊天程序 学习和实战,此文是:面试
疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -17编程
前面,已经完成一个高性能的 Java 聊天程序的四件大事:多线程
完成了协议选型,选择了性能更佳的 Protobuf协议。具体的文章为: Netty+Protobuf 整合一:实战案例,带源码架构
介绍了 通信消息数据包的几条设计准则。具体的文章为: Netty +Protobuf 整合二:protobuf 消息通信协议设计的几个准则并发
解决了一个很是基础的问题,这就是通信的 **粘包和半包问题。**具体的文章为:Netty 粘包/半包 全解 | 史上最全解读框架
前一篇文件,已经完成了 系统三大组成模块的组成介绍。 具体的文章为:Netty聊天程序(实战一):从0开始实战100w级流量应用异步
在设计客户端以前,发现一个很是重要的基础知识点,没有讲到。这个知识点就是异步回调。分布式
因为异步回调使用频率是如此之高,因此不得不停下来,详细介绍一下。
随着移动互联网的蓬勃发展,业务架构也随之变得错综复杂,业务系统愈来愈多。打个简单的比方:以前一个业务只须要调取一次第三方接口,现在,该业务需调取多个甚至N个不一样的第三方接口,获取N种上游数据。一般,咱们处理方法是异步去调取这些接口。
问题就来了,如何获取处理异步调用的结果呢 ?
或者说,异步线程执行完成后,如何与发起线程交互呢?
这就涉及到线程的异步回调问题,这也是大流量高并发不可回避的问题。
首先,了解下同步、异步、阻塞、非阻塞、回调等相关概念;
其次,简单介绍java future和guava future相关技术,并经过示例代码进一步对其进行理解;
最后,对java future和guava future进行比较。
写到这里,尼恩就想到了在中学8年级的语文课。在课本中,有一篇华罗庚的课文——《统筹方法》,课文介绍的是统筹方法,该方法的主要目的是合理安排工做流程中的各道工序。
里边举了一个泡茶的例子。列出了三种泡茶的工序模型。在文中的三种工序流程中,有多重排列组合的模式。
工序模型一:顺序模式
洗好水壶,灌上凉水,放在火上;
等水开,洗茶壶、洗茶杯;
洗完茶杯后,泡茶喝。
工序模型二:并发模式
洗好水壶,灌上凉水,放在火上;
在等待水开的时间里,洗茶壶、洗茶杯;
等水开了,泡茶喝。
《统筹方法》这篇文章中,忽略了一个很很重要的问题: 就是等水开是一段数量级最大的时间,这个时间,远远超过了准备水、准备茶杯的时间。
从实际出发,为了避免浪费等水开时间,尼恩在这里增长一个动做 —— 读书。而且,当水烧好后,通知做者中止读书,去泡茶喝。这就至关于回调模式。
工序模式三:回调模式
洗好水壶,灌上凉水,放在火上;
在等待水开的时间里,洗茶壶、洗茶杯;
在等水开的时间里,读书;
水开了,通知做者泡茶喝。
对比起来:顺序模式效率最低,回调模式效率最高。
以上三种模式泡茶喝的方式,使用Java,如何实现呢?
先来看一些基本的概念吧!
前面只是一个例子,对并发的主要模式进行形象的说明。
下面正式来讲下经常使用的几个和并发相关的概念。
一:同步
所谓同步,就是在发出一个功能调用时,在没有获得结果以前,该调用就不返回。也就是必须一件一件事作,等前一件作完了才能作下一件事。
单线程模式,就是绝对同步的。
二: 异步
异步首先必须是多线程模式。是指当前线程,向其余的异步线程发出调用指令。当前线程和异步线程,逻辑上同时执行。
三:阻塞
在异步的场景下,当前线程阻塞住,等待异步线程的执行结果。阻塞是指线程进入非可执行状态,在这个状态下,cpu不会给线程分配时间片,即线程暂停运行。
阻塞模式是效率比较低的,若是阻塞严重的话,至关于又回到了同步的时代。
四:非阻塞
非阻塞和阻塞的概念相对应,指在不能马上获得结果以前,当前线程不会阻塞住,而会继续向下执行。
回调就是一种非阻塞的异步模式。并发线程经过回调,能够将结果返回给发起线程。除了回调,还有其余的非阻塞异步模式,好比消息通信、信号量等等。
阻塞模式的泡茶模型,对应到前面的第二种泡茶喝的工序模型。
在阻塞模式泡茶喝的模型中,有三条线程,他们分别是:
线程一:烧水线程
洗好水壶,灌上凉水,放在火上;
线程二:清洗线程
洗茶壶、洗茶杯;
线程三:主线程
分别启动烧水线程、清洗线程。等水开了,等水杯洗好了,而后泡茶喝。
具体以下图:
前面提到,阻塞模式的效率不是最高的。
更高效率的是回调模式。主线程在等待的时间了,不是死等,而是去干读书的活儿。等其余两条线程完成后,经过回调方式,去完成泡茶的动做。
在回调模式泡茶喝的模型中,仍是三条线程,他们的工做稍微有些变更:
线程一:烧水线程
洗好水壶,灌上凉水,放在火上;烧好水后,去执行泡茶回调。
线程二:清洗线程
洗茶壶、洗茶杯;清洗完成后,也去执行一下泡茶的动做。
线程三:主线程
分别启动烧水线程、清洗线程。而后去读书。
具体以下图:
严格来讲,上图是经不起推敲的。
为啥呢? 那个泡茶喝回调方法,在执行的流程上,不属于主线程在执行。只是在业务逻辑上,泡茶喝这个动做与主线程上的其余动做,关联性更强。
上图,更好的理解方式是,尽可能站在业务流程的角度去理解。
回调不是惟一的非阻塞方式。
还有线程间通讯、信号量等等,不少的非阻塞方式。可是回调倒是一种最好用的、也是开发中用的最多的线程间非阻塞的交互方式。
下面,从最原始的阻塞模式讲起,起底整个异步回调模式。
Java中,线程有一个join操做,也叫线程的合并。
join操做的做用,就是完成异步阻塞的工做——阻塞当前的线程,直到异步的并发线程的执行完成。
若是线程A的执行过程当中,经过B.join操做,合并B线程,叫作线程的合并。合并的重要特色之一是,线程A进入阻塞模式,直到B线程执行完成。
为了方便表达,模拟一下包工头的甲方和乙方。
将发起合并的线程A叫作甲方线程,被发起的线程B为乙方线程。
简单的说,线程合并就是——甲方等待乙方执行完成。换句话说,甲方将乙方线程合并到甲方线程。
在泡茶喝的例子中,主线程经过join操做,等待烧水线程和清洗线程。这就是一种异步阻塞。
具体以下图:
先看实例,再看方法的详细介绍。
泡茶喝的异步阻塞版本,实现以下:
package com.crazymakercircle.coccurent; import com.crazymakercircle.util.Print; /** * Created by 尼恩 at 疯狂创客圈 */ public class JoinDemo { public static final int SLEEP_GAP = 500; public static String getCurThreadName() { return Thread.currentThread().getName(); } static class HotWarterThread extends Thread { public HotWarterThread() { super("** 烧水-Thread"); } public void run() { try { Print.tcfo("洗好水壶"); Print.tcfo("灌上凉水"); Print.tcfo("放在火上"); //线程睡眠一段时间,表明烧水中 Thread.sleep(SLEEP_GAP); Print.tcfo("水开了"); } catch (InterruptedException e) { Print.tcfo(" 发生异常被中断."); } Print.tcfo(" 运行结束."); } } static class WashThread extends Thread { public WashThread() { super("$$ 清洗-Thread"); } public void run() { try { Print.tcfo("洗茶壶"); Print.tcfo("洗茶杯"); Print.tcfo("拿茶叶"); //线程睡眠一段时间,表明清洗中 Thread.sleep(SLEEP_GAP); Print.tcfo("洗完了"); } catch (InterruptedException e) { Print.tcfo(" 发生异常被中断."); } Print.tcfo(" 运行结束."); } } public static void main(String args[]) { Thread hThread = new HotWarterThread(); Thread wThread = new WashThread(); hThread.start(); wThread.start(); try { // 合并烧水-线程 hThread.join(); // 合并清洗-线程 wThread.join(); Thread.currentThread().setName("主线程"); Print.tcfo("泡茶喝"); } catch (InterruptedException e) { Print.tcfo(getCurThreadName() + "发生异常被中断."); } Print.tcfo(getCurThreadName() + " 运行结束."); } }
演示程序中有三条线程:
一条是主线程main;
一条是烧水线程“hThread”;
一条是清洗线程“wThread”;
main线程,调用了hThread.join()实例方法,合并烧水线程,也调用了 wThread.join()实例方法,合并清洗线程。
另外说明一下:hThread是这里的烧水线程实例的句柄,"** 烧水-Thread"是烧水线程实例的线程名称,二者不能混淆。
join的方法应用场景:异步阻塞场景。
具体来讲:甲方(发起线程)的调用乙方(被发起线程)的join方法,等待乙方执行完成;若是乙方没有完成,甲方阻塞。
join是Thread类的一个实例方法,使用的方式大体以下:
// 合并烧水-线程 hThread.join(); // 合并清洗-线程 wThread.join();
实际上,join方法是有三个重载版本:
(1)void join(): 等待乙方线程执行结束,甲方线程重启执行。
(2)void join(long millis): 等待乙方线程执行一段时间,最长等待时间为 millis 毫秒。超过millis 毫秒后,不论乙方是否结束,甲方线程重启执行。
(3)void join(long millis, int nanos): 等待乙方线程执行一段时间,最长等待时间为 millis 毫秒,加nanos 纳秒。超过期间后,不论乙方是否结束,甲方线程重启执行。
强调一下容易混淆的几点:
(1)join方法是实例方法,须要使用线程句柄去调用,如thread.join();
(2)执行到join代码的时候,不是thread所指向的线程阻塞,而是当前线程阻塞;
(3)thread线程表明的是被合并线程(乙方),当前线程阻塞线程(甲方)。当前线程让出CPU,进入等待状态。
(4)只有等到thread线程执行完成,或者超时,当前线程才能启动执行。
join合并有一个很大的问题,就是没有返回值。
若是烧水线程的水有问题,或者烧水壶坏了,mian线程是没有办法知道的。
若是清洗线程的茶杯有问题,清洗不来了,mian线程是没有办法知道的。
形象的说,join线程就是一个闷葫芦。
仍是异步阻塞,可是须要得到结果,怎么办呢?
可使用java 的FutureTask 系列类。
FutureTask相关的类型,处于java.util.concurrent包中,不止一个类,是一个系列。同时,这也是Java语言在1.5 版本以后提供了一种的新的多线程使用方法。
咱们知道,异步线程的一个重要接口是Runnable,这里执行异步线程的业务代码。可是,Runnable的run方法有一个问题,它是没有返回的。
所以,Runnable不能用在须要有异步返回值的异步场景。
Java语言在1.5 版本以后从新定义了一个新的、相似Runnable的接口,Callable接口,将run方法改成了call方法,而且带上了返回值。
Callable的代码以下:
package java.util.concurrent; @FunctionalInterface public interface Callable<V> { V call() throws Exception; }
Callable接口位于java.util.concurrent包中,Callable接口是一个泛型接口。也是一个“函数式接口”。惟一的抽象方法call有返回值,返回值类型为泛型形参类型。call抽象方法还有一个Exception的异常声明,允许方法的实现版本内部的异常不通过捕获。
Callable接口相似于Runnable。不一样的是,Runnable的惟一抽象方法run没有返回值,也没有强制审查异常的异常声明。比较而言,Callable接口的功能更强大一些。
有一个异想天开的问题:
做为新版的Callable接口实例,可否做为Thread线程实例的target来使用呢?
答案是不能。
Callable接口与Runnable接口之间没有任何的继承关系,并且两者惟一方法在的名字上也不一样。Callable接口实例没有办法做为Thread线程实例的target来使用。
咱们知道,java里边的线程类型,就是Thread。Callable须要异步执行,就须要和Thread创建联系。java提供了一个搭桥的角色——FutureTask类。
顾名思义,这个是一个将来执行的任务,就至关于新线程所执行的操做。
FutureTask 类也位于 java.util.concurrent包。
FutureTask类 构造函数的参数为 Callable,而且间接的继承了Runnable接口。其构造器代码以下:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
到了这里,FutureTask类的做用就大体明白了。
若是还不明白,看一段实例代码:
Callable<Boolean> hJob = new HotWarterJob(); FutureTask<Boolean> hTask = new FutureTask<Boolean>(hJob); Thread hThread = new Thread(hTask, "** 烧水-Thread");
FutureTask就像一座位于Callable与Thread之间的桥。FutureTask 封装一个Callable,而后自身又做为Thread线程的target。
FutureTask还有一个十分重要的贡献。
Thread线程执行过程当中,异步线程的代码逻辑在Callable的call方法中,而call方法返回的结果,则须要经过 FutureTask 去获取。
好了,这下就应该基本清楚了。
总结一下FutureTask这个媒婆的做用:
(1)负责牵线
(2)经过媒婆取得结果
为了完成这个两个伟大的使命,FutureTask有个相对比较复杂的继承关系,具体以下图:
首先,FutureTask实现了一个接口——RunnableFuture接口,而该RunnableFuture接口继承了Runnable接口和Future接口。
Runnable接口咱们很熟悉,就是那个java 线程Runnable,表明异步线程的代码逻辑。
Future接口又是啥呢?
提早剧透下,这个接口,就是用来获取异步线程结果的。
Future接口和Runnable接口同样,都是牛气冲天的接口。 而FutureTask 间接的实现这个两大接口。
正由于FutureTask可以有两个很牛逼的爹,因此本身家才很牛逼。
FutureTask 既能当作一个Runnable 做为 target ,直接被Thread执行;也能做为Future用来去取得Callable的计算结果。
Future接口这个不是一个复杂的接口,梳理一下,主要提供了3大功能:
(1)获取并发的任务完成后的执行结果。
(2)可以取消并发执行中的任务;
(3)判断并发任务是否执行完成;
固然,第一点是最为经常使用的。也是这个接口的最初使命。
Future接口的代码以下:
package java.util.concurrent; public interface Future<V> { boolean cancel(boolean mayInterruptRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout,TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
对Future接口的方法,详细说明以下:
V get() :获取并发任务执行的结果。注意,这个方法是阻塞性的。若是并发任务没有执行完成,调用此方法的线程会一直阻塞,直到并发任务执行完成。
V get(Long timeout , TimeUnit unit) :获取并发任务执行的结果。也是阻塞性的,可是会有阻塞的时间限制,若是阻塞时间超过设定的timeout时间,该方法将抛出异常。
boolean isDone():获取并发任务的执行状态。若是任务执行结束,返回true。
boolean isCancelled():获取并发任务的取消状态。若是任务完成前被取消,则返回true。
boolean cancel(boolean mayInterruptRunning):取消并发任务的执行。
说完了FutureTask的两个爹,再来到FutureTask自身。
在FutureTask内部,又有哪些成员和方法,具体的执行并发任务、异步获取任务结果的呢?
首先,FutureTask内部有一个 Callable类型的成员:
private Callable<V> callable;
这个callable实例属性,是构造器传进来的。用来保存并发执行的 Callable<V>类型的任务。callable实例属性,是构造器强制性的,必需要在FutureTask实例构造的时候进行初始化。
其次,FutureTask内部有一个run方法。
这个run方法,是Runnable接口在FutureTask内部的实现。在这个run方法其中,会执行到callable成员的call方法。执行完成后,结果如何提供出去呢?这就是到了最后一点。
最后,FutureTask内部有另外一个 Object 类型的重要成员——outcome实例属性:
private Object outcome;
掐指一算,就知道这个outcome属性,是用来保存callable成员call方法的执行结果。FutureTask类run方法执行完成callable成员的call方法后,会将结果保存在outcome实例属性,供FutureTask类的get实例方法获取。
好了,重要将这个媒婆介绍完了。
若是尚未清楚,没关系,看一个实例就一目了然了。
回顾一下,前面的join闷葫芦合并阻塞有一个很大的问题,就是没有返回值。
若是烧水线程的水有问题,或者烧水壶坏了,mian线程是没有办法知道的。
若是清洗线程的茶杯有问题,清洗不来了,mian线程是没有办法知道的。
为了演示结果,给主类增长两个成员:
static boolean warterOk = false; static boolean cupOk =false;
表明烧水成功和清洗成功。初始值都为false。
烧水线程、清洗线程执行完后,都须要返回结果。 主线程获取后,保存在上面的两个主类成员中。
废话很少说,看代码:
package com.crazymakercircle.coccurent; import com.crazymakercircle.util.Print; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * Created by 尼恩 at 疯狂创客圈 */ public class JavaFutureDemo { public static final int SLEEP_GAP = 500; public static String getCurThreadName() { return Thread.currentThread().getName(); } static class HotWarterJob implements Callable<Boolean> //① { @Override public Boolean call() throws Exception //② { try { Print.tcfo("洗好水壶"); Print.tcfo("灌上凉水"); Print.tcfo("放在火上"); //线程睡眠一段时间,表明烧水中 Thread.sleep(SLEEP_GAP); Print.tcfo("水开了"); } catch (InterruptedException e) { Print.tcfo(" 发生异常被中断."); return false; } Print.tcfo(" 运行结束."); return true; } } static class WashJob implements Callable<Boolean> { @Override public Boolean call() throws Exception { try { Print.tcfo("洗茶壶"); Print.tcfo("洗茶杯"); Print.tcfo("拿茶叶"); //线程睡眠一段时间,表明清洗中 Thread.sleep(SLEEP_GAP); Print.tcfo("洗完了"); } catch (InterruptedException e) { Print.tcfo(" 清洗工做 发生异常被中断."); return false; } Print.tcfo(" 清洗工做 运行结束."); return true; } } static boolean warterOk = false; static boolean cupOk =false; public static void drinkTea() { if (warterOk && cupOk) { Print.tcfo("泡茶喝"); } else if (!warterOk) { Print.tcfo("烧水失败,没有茶喝了"); } else if (!cupOk) { Print.tcfo("杯子洗不了,没有茶喝了"); } } public static void main(String args[]) { Callable<Boolean> hJob = new HotWarterJob();//③ FutureTask<Boolean> hTask = new FutureTask<Boolean>(hJob);//④ Thread hThread = new Thread(hTask, "** 烧水-Thread");//⑤ Callable<Boolean> wJob = new WashJob();//③ FutureTask<Boolean> wTask = new FutureTask<Boolean>(wJob);//④ Thread wThread = new Thread(wTask, "$$ 清洗-Thread");//⑤ hThread.start(); wThread.start(); Thread.currentThread().setName("主线程"); try { warterOk = hTask.get(); cupOk = wTask.get(); // hThread.join(); // wThread.join(); drinkTea(); } catch (InterruptedException e) { Print.tcfo(getCurThreadName() + "发生异常被中断."); } catch (ExecutionException e) { e.printStackTrace(); } Print.tcfo(getCurThreadName() + " 运行结束."); } }
借助上面的喝茶实例代码,说明一下经过FutureTask获取异步结果的流程步骤:
(1)异步代码逻辑须要继承Callable,经过call方法返回具体的值
static class WashJob implements Callable<Boolean> { @Override public Boolean call() throws Exception { //..业务代码,而且有返回值 }
(3)从异步逻辑到异步线程,须要媒婆类FutureTask搭桥
Callable<Boolean> hJob = new HotWarterJob();//异步逻辑 FutureTask<Boolean> hTask = new FutureTask<Boolean>(hJob);//媒婆实例 Thread hThread = new Thread(hTask, "** 烧水-Thread");//异步线程
FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。因此,在使用的时候,俩个类型的泛型参数必定须要一致的。
(3)取得异步线程的执行结果,也须要FutureTask 媒婆实例作下二传
warterOk = hTask.get();
经过FutureTask 实例的get方法,能够获取线程的执行结果。
三步至此,结果到手。
总结一下,FutureTask 比 join 线程合并高明,能取得异步线程的结果。
可是,也就未必高明到哪里去了。为啥呢?
由于,经过FutureTask的get方法,获取异步结果时,主线程也会被阻塞的。这一点,FutureTask和join也是一致的,他们俩都是异步阻塞模式。
异步阻塞的效率是比较低的,被阻塞的主线程,不能干任何事情,惟一能干的,就是在傻傻等待。
若是想提升效率,须要用到非阻塞模式。这里只讲回调模式的非阻塞,其余模式的非阻塞,请关注疯狂创客圈的后续文章。
原生Java,除了阻塞模式的获取结果,并无实现非阻塞模式的异步回调。若是须要用到异步回调,得引入一些额外的框架。
在很是著名的google 提供的扩展包 Guava中,提供了一种异步回调的解决方案。
为了实现异步回调,Guava 对Java的Future 异步模式进行能力导入:
(1)导入了一个新的接口 FutureCallback,表明回调执行的业务逻辑
(2)对Java并发包中的 Future 接口进行了扩展,将回调逻辑做为监听器绑定到异步线程
FutureCallback 是一个新增的接口,用来填写回调逻辑。这个接口,是在实际开发中编程使用到的。回调的代码,编写在它的实现类中。
FutureCallback拥有两个回调方法:
(1)onSuccess ,在异步线程执行成功回调
(2)onFailure,在异步线程抛出异常时回调
FutureCallback的源码以下:
public interface FutureCallback<V> { void onSuccess(@Nullable V var1); void onFailure(Throwable var1); }
若是将回调方法,绑定到异步线程去呢?
Guava中,有一个很是关键的角色,ListenableFuture。看名称,就能对应出它与Java 中的原生接口的亲戚关系。
若是没有猜错,这个接口是 Guava 对java 的Future接口的扩展。
来看看 ListenableFuture接口的源码,以下:
package com.google.common.util.concurrent; import java.util.concurrent.Executor; import java.util.concurrent.Future; public interface ListenableFuture<V> extends Future<V> { void addListener(Runnable var1, Executor var2); }
前面讲到,经过Java的Future接口,能够阻塞取得异步的结果。在这个基础上,ListenableFuture增长了一个方法 —— addListener 。
这个方法的做用,就是将前一小节的FutureCallback 回调逻辑,绑定到异步线程上。 能够是,addListener 不直接在实际编程中使用。这个方法只在Guava内部使用,若是对它感兴趣,能够查看Guava源码。
既然addListener 方法不能直接使用,那么,在实际编程中,如何将 FutureCallback 回调逻辑绑定到异步线程呢?
不慌,办法老是有的。
须要用到Guava的Futures 工具类。这个类有一个addCallback 静态方法,将ListenableFuture 的实例和FutureCallback 的回调实例,进行绑定。
绑定的示意代码以下:
Futures.addCallback( hFuture , new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { //成功时候的回调逻辑 } public void onFailure(Throwable t) { //异常时候的回调逻辑 } });
从上文已知,原生java的Future接口的实例,一种方法是——直接构建媒婆类FutureTask的实例,就是Future接口的实例。
固然,还有第二种方法,就是经过线程池获取Future接口的实例。具体的作法是向Java线程池提交异步任务,包括Runnable或者Callable实例。
方法以下:
Future<Boolean> hTask = pool.submit(hJob); Future<Boolean> wTask = pool.submit(wJob);
注意,pool 是一个Java 线程池。
若是要获取Guava的ListenableFuture 实例,主要是经过相似上面的第二种方式——向线程池提交任务的异步任务的方式获取。不过,用到的线程池,是Guava的线程池,不是Java的线程池。
Guava线程池,而是对Java线程池的一种装饰。
两种线程池的建立代码,具体以下:
//java 线程池 ExecutorService jPool = Executors.*newFixedThreadPool*(10); //guava 线程池 ListeningExecutorService gPool = MoreExecutors.*listeningDecorator*(jPool);
有了Guava的线程池以后,就能够经过提交任务,来获取ListenableFuture 实例了。代码以下 :
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
关于Gava的线程池,请关注【疯狂创客圈】的线程池的博客文章。
总结一下,Guava异步回调的流程以下:
第一步:建立Java的 Callable的异步任务实例。实例以下:
Callable<Boolean> hJob = new HotWarterJob();//异步任务Callable<Boolean> wJob = new WashJob();//异步任务
异步任务也能够是Runnable类型。
第二步: 获取Guava线程池
//java 线程池 ExecutorService jPool = Executors.*newFixedThreadPool*(10); //guava 线程池 ListeningExecutorService gPool = MoreExecutors.*listeningDecorator*(jPool);
第三步: 提交异步任务到Guava线程池,获取ListenableFuture 实例
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
第四步:建立回调的 FutureCallback 实例,经过Futures.addCallback,将回调逻辑绑定到ListenableFuture 实例。
Futures.*addCallback*( hFuture , new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { //成功时候的回调逻辑 } public void onFailure(Throwable t) { //异常时候的回调逻辑 } });
完成以上四步,当异步逻辑执行完成后,就会回调FutureCallback 实例中的回调代码。
已经对喝茶实例的代码很是熟悉下,下面是Guava的异步回调的演进版本,代码以下:
package com.crazymakercircle.coccurent; import com.crazymakercircle.util.Print; import com.google.common.util.concurrent.*; import java.util.concurrent.*; /** * Created by 尼恩 at 疯狂创客圈 */ public class GuavaFutureDemo { public static final int SLEEP_GAP = 500; public static String getCurThreadName() { return Thread.currentThread().getName(); } static class HotWarterJob implements Callable<Boolean> //① { @Override public Boolean call() throws Exception //② { try { Print.tcfo("洗好水壶"); Print.tcfo("灌上凉水"); Print.tcfo("放在火上"); //线程睡眠一段时间,表明烧水中 Thread.sleep(SLEEP_GAP); Print.tcfo("水开了"); } catch (InterruptedException e) { Print.tcfo(" 发生异常被中断."); return false; } Print.tcfo(" 运行结束."); return true; } } static class WashJob implements Callable<Boolean> { @Override public Boolean call() throws Exception { try { Print.tcfo("洗茶壶"); Print.tcfo("洗茶杯"); Print.tcfo("拿茶叶"); //线程睡眠一段时间,表明清洗中 Thread.sleep(SLEEP_GAP); Print.tcfo("洗完了"); } catch (InterruptedException e) { Print.tcfo(" 清洗工做 发生异常被中断."); return false; } Print.tcfo(" 清洗工做 运行结束."); return true; } } static boolean warterOk = false; static boolean cupOk = false; public synchronized static void drinkTea() { if (warterOk && cupOk) { Print.tcfo("泡茶喝"); } else if (!warterOk) { Print.tcfo("烧水失败,没有茶喝了"); } else if (!cupOk) { Print.tcfo("杯子洗不了,没有茶喝了"); } } public static void main(String args[]) { Thread.currentThread().setName("主线程"); Callable<Boolean> hJob = new HotWarterJob();//③ Callable<Boolean> wJob = new WashJob();//③ //java 线程池 ExecutorService jPool = Executors.newFixedThreadPool(10); //guava 线程池 ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool); ListenableFuture<Boolean> hFuture = gPool.submit(hJob); Futures.addCallback(hFuture, new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { if (r) { warterOk = true; drinkTea(); } else { Print.tcfo("烧水失败,没有茶喝了"); } } public void onFailure(Throwable t) { Print.tcfo("烧水失败,没有茶喝了"); } }); ListenableFuture<Boolean> wFuture = gPool.submit(wJob); Futures.addCallback(wFuture, new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { if (r) { cupOk = true; drinkTea(); } else { Print.tcfo("清洗失败,没有茶喝了"); } } public void onFailure(Throwable t) { Print.tcfo("杯子洗不了,没有茶喝了"); } }); try { Print.tcfo("读书中......"); Thread.sleep(100000); } catch (InterruptedException e) { Print.tcfo(getCurThreadName() + "发生异常被中断."); } Print.tcfo(getCurThreadName() + " 运行结束."); gPool.shutdown(); } }
本文已经太长,还有不少内容
未完待续
为何说异步回调是如此的重要呢 ? 由于高并发编程,处处都用到Future模式和Callback模式。
下一篇:Netty 中的Future 回调实现与线程池详解。这个也是一个很是重要的基础篇。
原文出处:https://www.cnblogs.com/crazymakercircle/p/10086062.html