#Java 并发编程(一)数据库
##同步容器 1.Vector,Hashtable。 实现线程安全的方式是:将它们的状态封装起来,并对每一个共有方法进行同步,使得每次只有一个线程能访问容器的状态。使用了Java监视器模式。编程
2.Vector代码分析 //根据下标获取数据,都是使用synchronized实现同步 public synchronized E get(int index) { if (index >= elementCount) throw new ArrayIndexOutOfBoundsException(index); return elementData(index); }设计模式
//添加数据 public synchronized boolean add(E e) { modCount++; ensureCapacityHelper(elementCount + 1); elementData[elementCount++] = e; return true; } //扩容 private void grow(int minCapacity) { // overflow-conscious code int oldCapacity = elementData.length; int newCapacity = oldCapacity + ((capacityIncrement > 0) ? capacityIncrement : oldCapacity); if (newCapacity - minCapacity < 0) newCapacity = minCapacity; if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity); elementData = Arrays.copyOf(elementData, newCapacity); }
3.Hashtable代码分析数组
//使用synchronized实现同步 public synchronized V get(Object key) { Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { return (V)e.value; } } return null; } //添加元素 public synchronized V put(K key, V value) { Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; @SuppressWarnings("unchecked") //key值存在于数组中 Entry<K,V> entry = (Entry<K,V>)tab[index]; for(; entry != null ; entry = entry.next) { if ((entry.hash == hash) && entry.key.equals(key)) { V old = entry.value; entry.value = value; return old; } } //key值不存在则进行添加 addEntry(hash, key, value, index); return null; } private void addEntry(int hash, K key, V value, int index) { modCount++; Entry<?,?> tab[] = table; //若是超过了阈值 if (count >= threshold) { rehash(); tab = table; hash = key.hashCode(); index = (hash & 0x7FFFFFFF) % tab.length; } //添加新元素 @SuppressWarnings("unchecked") Entry<K,V> e = (Entry<K,V>) tab[index]; tab[index] = new Entry<>(hash, key, value, e); count++; }
##并发容器 1.JDK5.0提供了多种并发容器来改进同步容器的性能。同步容器将全部对容器状态的访问都串行化,以实现他们的线程安全性。代价是严重下降并发性。并发容器是针对多个线程并发设计的。缓存
ConcurrentHashMap用于代替基于散列的同步Map。安全
CopyOnWriteArrayList用于在遍历操做为主要操做的状况下代替同步的List。并发
2.Java8 ConcurrentHashMap 代码分析框架
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //key,value不能为空 if (key == null || value == null) throw new NullPointerException(); //使Key值分散更均匀 int hash = spread(key.hashCode()); //记录Node数量 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //延迟初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //此bucket为空,不用锁,使用cas添加 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
3.CopyOnWriteArrayList适合用在“读多,写少”的“并发”应用中,换句话说,它适合使用在读操做远远大于写操做的场景里,好比缓存。它不存在“扩容”的概念,每次写操做(add or remove)都要copy一个副本,在副本的基础上修改后改变array引用,因此称为“CopyOnWrite”,所以在写操做是加锁,而且对整个list的copy操做时至关耗时的,过多的写操做不推荐使用该存储结构。ide
Java8 CopyOnWriteArrayList 代码分析函数
//get 读操做 private E get(Object[] a, int index) { return (E) a[index]; } //add 写操做 public boolean add(E e) { //可重入锁 final ReentrantLock lock = this.lock; lock.lock(); try { //得到数组元素对象 Object[] elements = getArray(); int len = elements.length; //数组的copy Object[] newElements = Arrays.copyOf(elements, len + 1); //添加元素 newElements[len] = e; setArray(newElements); return true; } finally { //释放锁 lock.unlock(); } }
##阻塞队列和生产者-消费者模式
1.阻塞队列提供了可供阻塞的put和take方法,以及支持定时的offer和poll方法,若是队列已经满了,那么put 方法将一直阻塞直到有空间可用;若是队列为空,那么take 方法将会阻塞直到有元素可用。队列能够是有界的也能够是无界的,无界队列永远不会被充满,所以无界队列上的put方法也永远不会阻塞。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。BlockingQueue简化了生产者-消费者设计模式的实现过程,他支持任意数量的生产者和消费者。一张最多见的生产者-消费者设计模式就是线程池和工做队列的组合,在Executor任务执行框架中就体现了这种模式。
2.生产者-消费者模式代码示例
public class Client { public static class People{ BlockingQueue<Object> peBlockingQueue=new ArrayBlockingQueue<Object>(3); public void putPeople(Object object){ try { //若是此时队列已经满了将阻塞。 peBlockingQueue.put(object); } catch (Exception e) { e.printStackTrace(); } System.out.println("放入人员"); } public Object takePeople(){ Object object=null; try { //若是此时队列为空将阻塞。 object=peBlockingQueue.take(); } catch (Exception e) { e.printStackTrace(); } System.out.println("取出人员"); return object; } } static class putThread extends Thread{ private People people; private Object object=new Object(); public putThread(People people){ this.people=people; } @Override public void run() { people.putPeople(object); } } static class takeThread extends Thread{ private People people; public takeThread(People people){ this.people=people; } @Override public void run() { people.takePeople(); } } public static void main(String args[]){ People people=new People(); for (int i = 0; i < 5; i++) { new Thread(new putThread(people)).start(); } for (int i = 0; i < 5; i++) { new Thread(new takeThread(people)).start(); } } }
输出结果
放入人员
放入人员
放入人员
取出人员
放入人员
放入人员
取出人员
取出人员
取出人员
取出人员
3.Java 里的阻塞队列
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才可以从队列中获取到该元素。DelayQueue也是一个无界队列,所以往队列中插入数据的操做(生产者)永远不会被阻塞,而只有获取数据的操做(消费者)才会被阻塞。
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认状况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的全部生产者线程或消费者线程,当队列可用时,能够按照阻塞的前后顺序访问队列,即先阻塞的生产者线程,能够先往队列里插入元素,先阻塞的消费者线程,能够先从队列里获取元素。一般状况下为了保证公平性会下降吞吐量。建立公平的队列:
BlockingQueue<Object> peBlockingQueue=new ArrayBlockingQueue<Object>(3,true);
ArrayBlockingQueue代码分析:
//可重入锁 final ReentrantLock lock; //等待条件 private final Condition notEmpty; //等待条件 private final Condition notFull; //put 方法 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //若是计数等于数组长度 while (count == items.length) //非满阻塞 notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; //非空唤醒 notEmpty.signal(); } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //获取可中断锁 lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
咱们能够将DelayQueue运用在如下应用场景:
缓存系统的设计。能够用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,好比TimerQueue就是使用DelayQueue实现的。
关闭空闲链接。
##阻塞方法与中断方法
##同步工具类
信号量
Semaphore 中管理着一组虚拟的许可,许可的初始数量可经过构造函数来指定,在执行操做时能够首先得到许可,并在使用以后释放许可,若是没有许可,那么acquire将阻塞直到有许可。
Semaphore 能够用于实现资源池,例如数据库链接池。咱们能够构造一个固定长度的资源池,当池为空,请求资源将会阻塞直到有资源。也可使用Semaphore将任何一种容器变成有界阻塞容器。
public class TestSemaphore { public static class SemaphoreDemo { private ReentrantLock lock = new ReentrantLock(); private Semaphore semaphore; private final ArrayList<Object> resourceList = new ArrayList<Object>(); public SemaphoreDemo(ArrayList<Object> list) { this.resourceList.addAll(list); semaphore = new Semaphore(3); } // 获取资源 public Object acquire() throws InterruptedException { semaphore.acquire(); lock.lock(); try { return resourceList.get(resourceList.size() - 1); } catch (Exception InterruptedException) { } finally { lock.unlock(); } return null; } // 释放资源 public void release(Object resource) { lock.lock(); try { resourceList.add(resource); } finally { lock.unlock(); } semaphore.release(); } } public static void main(String[] args) { ArrayList<Object> resourceList = new ArrayList(); resourceList.add("Resource1"); resourceList.add("Resource2"); final SemaphoreDemo semaphoreDemo = new SemaphoreDemo(resourceList); Runnable task = new Runnable() { public void run() { Object reObject = null; try { // 获取资源 reObject = semaphoreDemo.acquire(); System.out.println(Thread.currentThread().getName() + ":" + reObject); //休眠 Thread.sleep(1500); System.out.println(Thread.currentThread().getName() + "!" + reObject); } catch (Exception e) { e.printStackTrace(); } finally { semaphoreDemo.release(reObject); } } }; ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 9; i++) { executorService.submit(task); } executorService.shutdown(); }
}
代码输出
pool-1-thread-1:Resource2
pool-1-thread-3:Resource2
pool-1-thread-2:Resource2
pool-1-thread-1!Resource2
pool-1-thread-3!Resource2
pool-1-thread-2!Resource2
pool-1-thread-4:Resource2
pool-1-thread-5:Resource2
pool-1-thread-5!Resource2
pool-1-thread-4!Resource2
闭锁
闭锁是一种同步工具类,能够延迟线程的进度直到其到达终止状态。闭锁能够用来确保某些活动直到其余活动都完成之后才继续执行。
CountDownLatch是一种灵活的闭锁实现,可使一个或多个线程等待一组事件发生。CountDownLatch有一个正数计数器,countDown方法对计数器作减操做,await方法等待计数器达到0。全部await的线程都会阻塞直到计数器为0或者等待线程中断或者超时。
CountDownLatch源码
//初始化时给定计数器大小 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } //递减锁存器的计数,若是计数到达零,则释放全部等待的线程。 public void countDown() { sync.releaseShared(1); } //使当前线程阻塞直到计数器为零 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
CountDownLatch使用示例
public class TestCountDownLatch { public static CountDownLatch latch=null; public static void main(String args[]) throws InterruptedException{ try { latch= new CountDownLatch(5); for (int i = 0; i < 5; i++) { new TestThread().start(); } latch.await(); System.out.println("5个线程已经完成"); } catch (Exception e) { }finally { } } static class TestThread extends Thread{ @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " sleep 1000ms."); latch.countDown(); }catch (InterruptedException e) { e.printStackTrace(); } } } }
输出结果
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
Thread-3 sleep 1000ms.
Thread-0 sleep 1000ms.
Thread-2 sleep 1000ms.
5个线程已经完成