保护性暂挂模式,也称为Guarded Suspension模式,指的是当前线程在执行某个任务以前,须要检查某一条件,只有在该条件成立的状况下,当前线程才能够继续往下执行当前任务。顾名思义,保护性暂挂模式是一种广义的概念,其主要载体有两个:预备条件和任务,在任何须要使用预先检查的状况中均可以使用保护性暂挂模式。java
以下是类图中各个角色定位的描述:服务器
好比咱们会遇到这种场景,在进行某些操做时,好比经过elasticsearch服务器进行查询或更新操做,咱们须要链接es服务器,而在es服务器链接上以前,全部的查询和更新操做都是须要被阻塞的。即便在服务器链接上以后,咱们也须要常常对服务器进行心跳测试,以检查与服务器的链接是否还存活在,若是不存活,则仍是须要继续阻塞其他的操做,而且尝试从新链接es服务器,这种状况咱们就可使用到保护性暂挂模式。保护性条件便是与es服务器的链接还存活在,若是不存活则须要挂起全部尝试链接服务器执行任务的线程,而且当前线程会尝试链接服务器。以下是示例代码:elasticsearch
public class ElasticSearchAgent { private volatile boolean connectedToServer = false; private final Predicate agentConnected = () -> connectedToServer; private final Blocker blocker = new ConditionVarBlocker(); private final Timer heartbeatTimer = new Timer(true); public void update(final UpdateCondition condition) throws Exception { GuardedAction<Void> guardedAction = new GuardedAction<Void>(agentConnected) { @Override public Void call() { doUpdate(condition); return null; } }; blocker.callWithGuard(guardedAction); } private void doUpdate(UpdateCondition condition) { try { TimeUnit.MICROSECONDS.sleep(20); // 模拟进行更新 } catch (InterruptedException e) { e.printStackTrace(); } } public void init() { Thread connectingThread = new Thread(new ConnectingTask()); connectingThread.start(); heartbeatTimer.schedule(new HeartBeatTask(), 60000, 2000); } public void disconnect() { connectedToServer = false; } protected void onConnected() { try { blocker.signalAfter(() -> { connectedToServer = true; return Boolean.TRUE; }); } catch (Exception e) { e.printStackTrace(); } } protected void onDisconnected() { connectedToServer = false; } private class ConnectingTask implements Runnable { @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) {} onConnected(); } } private class HeartBeatTask extends TimerTask { @Override public void run() { if (!testConnection()) { onDisconnected(); reconnect(); } } private boolean testConnection() { return true; } private void reconnect() { ConnectingTask connectingTask = new ConnectingTask(); connectingTask.run(); } } }
能够看到,在进行update()操做时,首先会建立一个GuardedAction对象,真正的更新操做是在该对象中进行的,这里的保护性条件是经过一个volatile类型的变量connectedToServer来控制的,若是当前与es服务器的链接还存活在,则该变量置为true。HeartBeatTask是一个定时任务,在60s延迟以后每隔2s会向服务器发送心跳测试,以检查链接是否存活,若是不存活,则会将connectedToServer变量置为false,而且会尝试链接服务器。在init()方法中首先会建立一个链接服务器的任务,以保证服务器链接在初始时的可用状态,而且其还会启动心跳测试的定时任务。以下是Blocker和ConditionVarBlocker的实现代码:ide
public interface Blocker { <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception; void signalAfter(Callable<Boolean> stateOperation) throws Exception; void signal() throws InterruptedException; void broadcastAfter(Callable<Boolean> stateOperation) throws Exception; }
public class ConditionVarBlocker implements Blocker { private final Lock lock; private final Condition condition; public ConditionVarBlocker(Lock lock) { this.lock = lock; this.condition = lock.newCondition(); } public ConditionVarBlocker() { this.lock = new ReentrantLock(); this.condition = lock.newCondition(); } @Override public <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception { lock.lockInterruptibly(); try { final Predicate guard = guardedAction.guard; while (!guard.evaluate()) { condition.await(); } return guardedAction.call(); } finally { lock.unlock(); } } @Override public void signalAfter(Callable<Boolean> stateOperation) throws Exception { lock.lockInterruptibly(); try { if (stateOperation.call()) { condition.signal(); } } finally { lock.unlock(); } } @Override public void signal() throws InterruptedException { lock.lockInterruptibly(); try { condition.signal(); } finally { lock.unlock(); } } @Override public void broadcastAfter(Callable<Boolean> stateOperation) throws Exception { lock.lockInterruptibly(); try { if (stateOperation.call()) { condition.signalAll(); } } finally { lock.unlock(); } } }
能够看到,ConditionVarBlocker中基本上都是模板代码,其声明了一个Lock对象和一个Condition对象,Lock对象用于对当前的先验条件检查过程进行同步处理,Condition对象则用于在先验条件不知足的状况下阻塞当前线程的。测试
在callWithGuard()方法中,首先会在一个循环中检查当前的先验条件是否知足,若是不知足,则使当前线程进入等待状态,若是知足,则当前线程继续执行其任务。这里须要注意的是,咱们使用了while()循环用于判断先验条件是否知足,由于有可能当前线程被意外的唤醒,或者说被唤醒以后先验条件仍是不知足,于是这里使用循环判断,以使当前线程在先验条件不知足的状况下继续等待。this
在signalAfter()方法中,其首先调用stateOperation.call()方法,判断当前的先验条件是否知足,只有在先验条件知足的状况下才会唤醒一个等待的线程。这里stateOperation是ElasticSearchAgent传入的用于判断当前是否处于链接状态的一个条件载体。lua
以下是GuardedAction的实现代码:线程
public abstract class GuardedAction<V> implements Callable<V> { protected final Predicate guard; public GuardedAction(Predicate guard) { this.guard = guard; } }
这里GuardedAction是一个抽象类,其主要封装了一个Predicate属性。GuardedAction的主要实如今ElasticSearchAgent.guardedMethod()方法中生成的,由于具体须要执行的任务须要调用方生成,这里只是提供了一个模板方法。以下是Predicate的代码:code
@FunctionalInterface public interface Predicate { boolean evaluate(); }
这里Predicate也只是一个声明而已,其具体的实现也是在ElasticSearchAgent中,本例中主要是判断connectedToServer是否为true,即处于链接服务器的状态。对象
public ConditionVarBlocker(Lock lock) { this.lock = lock; this.condition = lock.newCondition(); }
该方法用于防止ElasticSearchAgent因为某种缘由而须要加锁时可能会形成嵌套监视器锁死的问题的。所谓的嵌套监视器锁死的问题指的是,若是某个线程执行依次获取了两个锁,而因为先验条件不知足,从而致使当前线程释放了内层锁从而进入等待状态,而另外的线程为了检查当前的先验条件须要获取到外层锁,这就致使了锁循环等待的问题,在等待先验条件知足的线程持有外层锁,其没法释放,而尝试改变先验条件的线程正在尝试获取外层锁,但其一直没法获取到,从而形成了死锁。这种状况下就提供了该构造方法,若是ElasticSearchAgent须要对其方法进行加锁,那么其须要经过该构造方法将锁传递给ConditionVarBlocker,这样当前线程在释放锁的时候就会将外层锁和内层锁同时释放了(由于都是同一个锁)。