基于散列链表+红黑树实现,相似于 HashMap,JDK 8 进行了优化,利用 volatile + CAS 实现无锁化操做,保证线程安全的同时,提升性能。默认容量16,默认加载因子0.75;
散列链表和红黑树的内部类定义以下:数组
// 基本结构 static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; } // 红黑树结构,链表长度大于8时转换 static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; }
和 HashMap 比较,多了内部类 TreeBin,它不存储键值,而是指向 TreeNode 列表和它们的根节点,而 ConcurrentHashMap 也是操做 TreeBin。此外,TreeBin 还维护了读写锁状态,这会使得在树重组以前,持有锁的写操做会等待未持锁的读操做完成。安全
// 指向TreeNode列表和它们的根节点, static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; static final int WRITER = 1; // 持有写锁时 static final int WAITER = 2; // 等待写锁时 static final int READER = 4; // 用来设置读锁的增量值 }
如何作到线程安全的呢?
1. sizeCtl:控制表的初始化和重建。负数表示表正在初始化或者重建:
-1表示在初始化;
-(1+N)表示有N个正在进行重建的线程;
2. 节点哈希值表示的状态
MOVED=-1 表示 forward 节点;
TREEBIN=-2 表示 treeBin 的根节点;
3. V put(K key, V value) 操做
若是表为空,则初始化表;
若是hash位置为空,则经过CAS设置值;
若是hash=-1,则帮组扩容;
若是节点既不为空,也不等于-1,那么锁住该节点,在链表或者红黑树上添加值;
4. void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) 扩容
初始化新表,容量是原表的2倍;
复制元素到新表,处理一个节点就把节点设置为forward;
当这个节点为空时,经过CAS来设置forward;
当节点值为-1时,表示forward已经处理过了;
当节点不为空且不为-1时,锁住节点进行处理;
其余线程看到节点为forward时,向后遍从来完成;
5. V get(Object key) 操做
若是hash值匹配,则直接获取;
若是hash值小于0,则从树上查找;
不然,遍历链表寻找;
6. mappingCount()(推荐,由于其返回值时long) 和 size(),都是调用 sumCount() 来计算
定义了内部类 CounterCell 来计数,计算总数时,就是把 CounterCell[] 数组的值累加起来便可;并发
// put 源码 Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); // 表长度为空时,初始化表 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // hash的位置为空时,经过CAS设置值 } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 若是节点是 forward 节点,帮助扩容 else { synchronized (f) { // 不为空,不是 forward 节点时,synchronized 锁住节点 if (tabAt(tab, i) == f) { // 锁住后再次判断节点有没有变化 if (fh >= 0) { ... // 表示要操做链表节点 } else if (f instanceof TreeBin) { ... // 表示操做的是TreeBin节点 } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 链表长度大于8,转成红黑树 } }
// 并发扩容的方法 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // 初始化新的表,容量是原表的2倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; nextTable = nextTab; // nextTable 是定义的临时表,仅在表重建时不为空 transferIndex = n; // 表重建时的下一个表的索引 } int nextn = nextTab.length; // 扩容后的表长度 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; // true 表示该节点已处理 boolean finishing = false; // 确保已经完成了 for (int i = 0, bound = 0;;) { if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { ... // 完成了 return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1,表示多了一个线程来扩容 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 节点位置是空的,经过CAS设置值为forward else if ((fh = f.hash) == MOVED) advance = true; // 这个位置是forward节点,表示已经处理了 else { synchronized (f) { // 节点不为空,且不是forward节点,锁住该节点再处理 ... // 相似put的操做 } } } }
// get 源码 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; // 直接得到值 } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 在树上查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; // 遍历链表查找 }
// 计数方法 private transient volatile CounterCell[] counterCells; // 数组,存储统计值 @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; // 统计值累加 } } return sum; }
基于跳表实现,按照 key 天然排序,key 不能为 null,相似 TreeMap。
利用 volatile+CAS 来保证线程安全。app
static final class Node<K,V> { final K key; volatile Object value; volatile Node<K,V> next; } boolean casValue(Object cmp, Object val) { return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val); }
使用 ConcurrentSkipListMap 实现。ide
基于数组实现,至关于支持并发的 ArrayList,刚建立时初始化为长度0的数组。
利用写时复制来保证线程安全。
写时复制:数组是 volatile 类型的,修改数组时,首先 ReentrantLock 加锁,而后复制一个副本数组,对副本进行修改完成后,把原来的数组指向这个新的数组完成赋值。读时不用加锁。工具
private transient volatile Object[] array; public boolean add(E e) { // 加锁进行写时复制 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; // 拷贝新数组,长度+1 Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; // set给volatile的array setArray(newElements); return true; } finally { lock.unlock(); } }
使用 CopyOnWriteArrayList 实现。去重的,可是按照插入顺序排序的。性能
基于链表实现的无界的线程安全的非阻塞队列,遵循 FIFO,利用 volatile+CAS 来保证线程安全。优化
private static class Node<E> { volatile E item; volatile Node<E> next; } // 初始化 head 和 tail private transient volatile Node<E> head; private transient volatile Node<E> tail; public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } // 利用 CAS 修改链表 private boolean casTail(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); }
基于双向链表实现的无界的线程安全的非阻塞队列,实现方式相似 ConcurrentLinkedQueue。this
// 双向链表 static final class Node<E> { volatile Node<E> prev; volatile E item; volatile Node<E> next; }
实现:经过 ReentrantLock 和 Condition 实现的等待通知模型来实现阻塞队列。线程
基于数组实现的阻塞队列,须要指定容量。
// poll 相似 public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); // 加锁 try { if (count == items.length) return false; // 超过长度,返回false,数据丢失 final Object[] items = this.items; items[putIndex] = x; // putIndex表示下一次加元素的索引 if (++putIndex == items.length) putIndex = 0; // 达到长度后,索引位归零 count++; // 计数+1 notEmpty.signal(); // 通知能够取值了 return true; } finally { lock.unlock(); // 解锁 } }
基于链表实现的阻塞队列,默认容量为 Integer.MAX_VALUE。
实现相似 ArrayBlockingQueue,计数用的原子类 AtomicInteger。
基于二叉小顶堆实现的阻塞队列,保证取出的元素是最小的,默认初始化容量11。
基于数组实现的延迟阻塞队列。使用时必须实现 Delayed。
以 AtomicInteger 为例,利用 volatile+CAS 来保证原子操做,直接看源码注释
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; private volatile int value; // 直接获取 volatile 变量 public final int get() { return value; } // 经过 Unsafe 的 CAS 原子操做 volatile 变量 public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } // 经过 Unsafe 的 CAS 原子操做 + 1 public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; }
功能:指定 N 个线程等待所有完成后,继续执行。
实现:内部类 Sync 实现了 AQS 同步器,初始化时设置 AQS 的同步状态来表示 countDown 的数量,await() 方法把当前线程加入到 AQS 等待队列,让当前线程阻塞住,执行 countDown() 方法会把同步状态减1,当减到0时,唤醒等待队列中的线程。
功能:相似 CountDownLatch,可是支持 reset() 重置状态,能指定到达数量时执行的动做。
实现:基于 ReentrantLock 和 Condition 实现,核心源码以下
private int dowait(boolean timed, long nanos) { final ReentrantLock lock = this.lock; lock.lock(); // 加锁,保护 count try { if (Thread.interrupted()) { breakBarrier(); // 使用 signalAll 唤醒全部线程 throw new InterruptedException(); } int index = --count; // 线程数量递减 if (index == 0) { // 若是线程数量到达 0 final Runnable command = barrierCommand; if (command != null) command.run(); // 执行 barrierAction return 0; } // 此时线程数量还没到 0 for (;;) { try { if (!timed) trip.await(); // 调用 Condition 的 await 进行等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); // 待超时的等待 } } } finally { lock.unlock(); // 释放锁 } }
ThreadPoolExecutor 参数说明:
1. 核心线程池
2. 最大线程池
3. 线程空闲时间
4. 线程空闲时间单位
5. 阻塞队列
6. 线程工厂:建立具备相同特性的一组线程。
7. 拒绝策略
CallerRunsPolicy:重试添加当前的任务,会自动重复调用 execute() 方法,直到成功。
AbortPolicy:对拒绝任务抛弃处理,而且抛出异常。
DiscardPolicy:对拒绝任务直接无声抛弃,没有异常信息。
DiscardOldestPolicy:对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,而后把拒绝任务加到队列。
线程池数量理论值 -> CPU 密集型:N+1;IO 密集型:2N+1
线程的提交方式:
1. execute():用于提交不须要返回值的任务
2. submit():用于提交须要返回值的任务,返回future对象。
线程池线程的执行流程:核心 -> 队列 -> 最大 -> 拒绝策略
1. 若是当前运行的线程少于核心线程池时,则建立新的线程执行任务;
2. 若是当前运行的线程大于等于核心线程池时,则把任务加入阻塞队列;
3. 若是阻塞队列已经满了,则建立新的线程执行任务;
4. 若是线程数超过了最大线程数,则调用拒绝策略;