Java并发(基础知识)——显示锁和同步工具类

显示锁                                                                                    java

      Lock接口是Java 5.0新增的接口,该接口的定义以下:sql

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

  与内置加锁机制不一样的是,Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操做,全部加锁和解锁的方法都是显示的。ReentrantLock实现了Lock接口,与内置锁相比,ReentrantLock有如下优点:能够中断获取锁操做,获取锁时候能够设置超时时间。如下代码给出了Lock接口的标准使用形式:安全

Lock lock = new ReentrantLock();
...
lock.lock();
try{
	...
} finally {
	lock.unlock();

1.一、轮询锁与定时锁并发

      可定时的与可轮询的锁获取方式是由tryLock方法实现的,与无条件的锁获取方式相比,它具备跟完善的错误回复机制。tryLock方法的说明以下:app

boolean tryLock():仅在调用时锁为空闲状态才获取该锁。若是锁可用,则获取锁,并当即返回值 true。若是锁不可用,则此方法将当即返回值 false。

boolean tryLock(long time, TimeUnit unit) throws InterruptedException:
  若是锁在给定的等待时间内空闲,而且当前线程未被中断,则获取锁。

  若是锁可用,则此方法将当即返回值 true。若是锁不可用,出于线程调度目的,将禁用当前线程,而且在发生如下三种状况之一前,该线程将一直处于休眠状态:

  锁由当前线程得到;或者
  其余某个线程中断当前线程,而且支持对锁获取的中断;或者
  已超过指定的等待时间
  若是得到了锁,则返回值 true。

  若是当前线程:

  在进入此方法时已经设置了该线程的中断状态;或者
  在获取锁时被中断,而且支持对锁获取的中断,
  则将抛出 InterruptedException,并会清除当前线程的已中断状态。
  若是超过了指定的等待时间,则将返回值 false。若是 time 小于等于 0,该方法将彻底不等待。

  在内置锁中,死锁是一个严重的问题,恢复程序的惟一方法是从新启动程序,而防止死锁的惟一方法就是在构造程序时避免出现不一致的锁顺序,可定时的与可轮询的锁提供了另外一种选择:先用tryLock()尝试获取全部的锁,若是不能获取全部须要的锁,那么释放已经获取的锁,而后从新尝试获取全部的锁,如下例子演示了使用tryLock避免死锁的方法:先用tryLock来获取两个锁,若是不能同时获取,那么就回退并从新尝试。异步

    public boolean transferMoney(Account fromAcct, Account toAcct, DollarAmount amount,  long timeout, TimeUnit unit) throws InsufficientFundsException, InterruptedException {
        long fixedDelay = 1;
        long randMod = 2;
        long stopTime = System.nanoTime() + unit.toNanos(timeout);

        while (true) {
            if (fromAcct.lock.tryLock()) {
                try {
                    if (toAcct.lock.tryLock()) {
                        try {
                            if (fromAcct.getBalance().compareTo(amount) < 0)
                                throw new InsufficientFundsException();
                            else {
                                fromAcct.debit(amount);
                                toAcct.credit(amount);
                                return true;
                            }
                        } finally {
                            toAcct.lock.unlock();
                        }
                    }
                } finally {
                    fromAcct.lock.unlock();
                }
            }
            if (System.nanoTime() < stopTime)
                return false;
            NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
        }
    }

1.二、可中断的锁获取操做工具

      lockInterruptibly方法可以在得到锁的同时保持对中断的响应,该方法说明以下:ui

void lockInterruptibly() throws InterruptedException:
若是当前线程未被中断,则获取锁。

若是锁可用,则获取锁,并当即返回。

若是锁不可用,出于线程调度目的,将禁用当前线程,而且在发生如下两种状况之一之前,该线程将一直处于休眠状态:

锁由当前线程得到;或者
其余某个线程中断当前线程,而且支持对锁获取的中断。

若是当前线程:
在进入此方法时已经设置了该线程的中断状态;或者
在获取锁时被中断,而且支持对锁获取的中断,
则将抛出 InterruptedException,并清除当前线程的已中断状态。

1.三、读-写锁this

      Java 5除了增长了Lock接口,还增长了ReadWriteLock接口,即读写锁,该接口定义以下:spa

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

  读写锁容许多个读线程并发执行,可是不容许写线程与读线程并发执行,也不容许写线程与写线程并发执行。下面的例子使用了ReentrantReadWriteLock包装Map,从而使他可以在多个线程之间安全的共享:

public class ReadWriteMap <K,V> {
    private final Map<K, V> map;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock r = lock.readLock();
    private final Lock w = lock.writeLock();

    public ReadWriteMap(Map<K, V> map) {
        this.map = map;
    }

    public V put(K key, V value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }

    public V remove(Object key) {
        w.lock();
        try {
            return map.remove(key);
        } finally {
            w.unlock();
        }
    }

    public void putAll(Map<? extends K, ? extends V> m) {
        w.lock();
        try {
            map.putAll(m);
        } finally {
            w.unlock();
        }
    }

    public void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }

    public V get(Object key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }

    public int size() {
        r.lock();
        try {
            return map.size();
        } finally {
            r.unlock();
        }
    }

    public boolean isEmpty() {
        r.lock();
        try {
            return map.isEmpty();
        } finally {
            r.unlock();
        }
    }

    public boolean containsKey(Object key) {
        r.lock();
        try {
            return map.containsKey(key);
        } finally {
            r.unlock();
        }
    }

    public boolean containsValue(Object value) {
        r.lock();
        try {
            return map.containsValue(value);
        } finally {
            r.unlock();
        }
    }
}

同步工具类                                                                            

2.一、闭锁

      闭锁是一个同步辅助类,在完成一组正在其余线程中执行的操做以前,它容许一个或多个线程一直等待。

      用给定的计数初始化 CountDownLatch。因为调用了 countDown() 方法,因此在当前计数到达零以前,await 方法会一直受阻塞。以后,会释放全部等待的线程,await 的全部后续调用都将当即返回。这种现象只出现一次——计数没法被重置。若是须要重置计数,请考虑使用 CyclicBarrier。

      下例给出了闭锁的常见用法,TestHarness建立必定数量的线程,利用它们并发的执行指定的任务,它使用两个闭锁,分别表示"起始门"和"结束门"。每一个线程首先要作的就是在启动门上等待,从而确保全部线程都就绪后才开始执行,而每一个线程要作的最后一件事是将调用结束门的countDown方法减1,这能使主线程高效地等待直到全部工做线程都执行完毕,所以能够统计所消耗的时间:

public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
            throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) {
                    }
                }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
}

2.二、FutureTask

      FutureTask表示可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;若是计算还没有完成,则阻塞 get 方法。一旦计算完成,就不能再从新开始或取消计算。FutureTask的方法摘要以下:

boolean	cancel(boolean mayInterruptIfRunning) 
	试图取消对此任务的执行。

protected  void	done() 
	当此任务转换到状态 isDone(不论是正常地仍是经过取消)时,调用受保护的方法。

V get()  throws InterruptedException, ExecutionException
	若有必要,等待计算完成,而后获取其结果。

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
	若有必要,最多等待为使计算完成所给定的时间以后,获取其结果(若是结果可用)。

boolean	isCancelled() 
	若是在任务正常完成前将其取消,则返回 true。

boolean	isDone() 
	若是任务已完成,则返回 true。

void	run() 
	除非已将此 Future 取消,不然将其设置为其计算的结果。

protected  boolean	runAndReset() 
	执行计算而不设置其结果,而后将此 Future 重置为初始状态,若是计算遇到异常或已取消,则该操做失败。

protected  void	set(V v) 
	除非已经设置了此 Future 或已将其取消,不然将其结果设置为给定的值。

protected  void	setException(Throwable t) 
	除非已经设置了此 Future 或已将其取消,不然它将报告一个 ExecutionException,并将给定的 throwable 做为其缘由。

      FutureTask能够用来表示一些时间较长的计算,这些计算能够在使用计算结果以前启动,如下代码就是模拟一个高开销的计算,咱们能够先调用start()方法开始计算,而后在须要结果时,再调用get获得结果:

public class Preloader {
	ProductInfo loadProductInfo() throws DataLoadException {
		return null;
	}

	private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
			new Callable<ProductInfo>() {
				public ProductInfo call() throws DataLoadException {
					return loadProductInfo();
				}
			});
	private final Thread thread = new Thread(future);

	public void start() {
		thread.start();
	}

	public ProductInfo get() throws DataLoadException, InterruptedException {
		try {
			return future.get();
		} catch (ExecutionException e) {
			Throwable cause = e.getCause();
			if (cause instanceof DataLoadException)
				throw (DataLoadException) cause;
			else
				throw new RuntimeException(e);
		}
	}

	interface ProductInfo {
	}
}

class DataLoadException extends Exception {
}

2.三、信号量

      从概念上讲,信号量维护了一个许可集。若有必要,在许可可用前会阻塞每个 acquire(),而后等待获取许可。每一个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。可是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采起相应的行动。

      Semaphore 一般用于限制能够访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:

class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo
   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

   得到一项前,每一个线程必须从信号量获取许可,从而保证可使用该项。该线程结束后,将项返回到池中并将许可返回到该信号量,从而容许其余线程获取该项。注意,调用 acquire() 时没法保持同步锁,由于这会阻止将项返回到池中。信号量封装所需的同步,以限制对池的访问,这同维持该池自己一致性所需的同步是分开的。

      将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用做一个相互排斥的锁。这一般也称为二进制信号量,由于它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具备某种属性(与不少 Lock 实现不一样),便可以由线程释放“锁”,而不是由全部者(由于信号量没有全部权的概念)。在某些专门的上下文(如死锁恢复)中这会颇有用。

      Semaphore的构造方法可选地接受一个公平 参数。当设置为 false 时,此类不对线程获取许可的顺序作任何保证。特别地,闯入 是容许的,也就是说能够在已经等待的线程前为调用 acquire() 的线程分配一个许可,从逻辑上说,就是新线程将本身置于等待线程队列的头部。当公平设置为 true时,信号量保证对于任何调用获取方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、得到许可。注意,FIFO 排序必然应用到这些方法内的指定内部执行点。因此,可能某个线程先于另外一个线程调用了acquire,可是却在该线程以后到达排序点,而且从方法返回时也相似。还要注意,非同步的tryAcquire 方法不使用公平设置,而是使用任意可用的许可。

      一般,应该将用于控制资源访问的信号量初始化为公平的,以确保全部线程均可访问资源。为其余的种类的同步控制使用信号量时,非公平排序的吞吐量优点一般要比公平考虑更为重要。

      Semaphore还提供便捷的方法来同时 acquire 和释放多个许可。当心,在未将公平设置为 true 时使用这些方法会增长不肯定延期的风险。

      内存一致性效果:线程中调用“释放”方法(好比 release())以前的操做 happen-before 另外一线程中紧跟在成功的“获取”方法(好比 acquire())以后的操做。

2.四、栅栏

      CyclicBarrier是一个同步辅助类,它容许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier颇有用。由于该 barrier在释放等待线程后能够重用,因此称它为循环的barrier。

      CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达以后(但在释放全部线程以前),该命令只在每一个屏障点运行一次。若在继续全部参与线程以前更新共享状态,此屏障操做颇有用。

      示例用法:下面是一个在并行分解设计中使用barrier的例子:

class Solver {
	final int N;
	final float[][] data;
	final CyclicBarrier barrier;

	class Worker implements Runnable {
		int myRow;

		Worker(int row) {
			myRow = row;
		}

		public void run() {
			while (!done()) {
				processRow(myRow);

				try {
					barrier.await();
				} catch (InterruptedException ex) {
					return;
				} catch (BrokenBarrierException ex) {
					return;
				}
			}
		}
	}

	public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length;
     barrier = new CyclicBarrier(N, 
                     new Runnable() {
                       public void run() { 
                         //mergeRows(...); 
                       }
                     });
     for (int i = 0; i < N; ++i) 
       new Thread(new Worker(i)).start();

     waitUntilDone();
   }
}

      在这个例子中,每一个 worker 线程处理矩阵的一行,在处理完全部的行以前,该线程将一直在屏障处等待。处理完全部的行以后,将执行所提供的 Runnable 屏障操做,并合并这些行。若是合并者肯定已经找到了一个解决方案,那么 done() 将返回 true,全部的 worker 线程都将终止。

      若是屏障操做在执行时不依赖于正挂起的线程,则线程组中的任何线程在得到释放时都能执行该操做。为方便此操做,每次调用 await() 都将返回能到达屏障处的线程的索引。而后,您能够选择哪一个线程应该执行屏障操做.

      对于失败的同步尝试,CyclicBarrier 使用了一种要么所有要么全不 (all-or-none) 的破坏模式:若是由于中断、失败或者超时等缘由,致使线程过早地离开了屏障点,那么在该屏障点等待的其余全部线程也将经过 BrokenBarrierException以反常的方式离开。

      内存一致性效果:线程中调用 await() 以前的操做 happen-before 那些是屏障操做的一部份的操做,后者依次 happen-before 紧跟在从另外一个线程中对应 await() 成功返回的操做。

相关文章
相关标签/搜索