在 Java 中,同步容器主要包括 2 类:java
同步容器的同步原理就是在方法上用 synchronized
修饰。那么,这些方法每次只容许一个线程调用执行。git
因为被 synchronized
修饰的方法,每次只容许一个线程执行,其余试图访问这个方法的线程只能等待。显然,这种方式比没有使用 synchronized
的容器性能要差。github
同步容器真的必定安全吗?算法
答案是:未必。同步容器未必真的安全。在作复合操做时,仍然须要加锁来保护。数组
常见复合操做以下:安全
不安全的示例数据结构
public class Test { static Vector<Integer> vector = new Vector<Integer>(); public static void main(String[] args) throws InterruptedException { while(true) { for(int i=0;i<10;i++) vector.add(i); Thread thread1 = new Thread(){ public void run() { for(int i=0;i<vector.size();i++) vector.remove(i); }; }; Thread thread2 = new Thread(){ public void run() { for(int i=0;i<vector.size();i++) vector.get(i); }; }; thread1.start(); thread2.start(); while(Thread.activeCount()>10) { } } } }
执行时可能会出现数组越界错误。并发
Vector 是线程安全的,为何还会报这个错?很简单,对于 Vector,虽然能保证每个时刻只能有一个线程访问它,可是不排除这种可能:分布式
当某个线程在某个时刻执行这句时:高并发
for(int i=0;i<vector.size();i++) vector.get(i);
倘若此时 vector 的 size 方法返回的是 10,i 的值为 9
而后另一个线程执行了这句:
for(int i=0;i<vector.size();i++) vector.remove(i);
将下标为 9 的元素删除了。
那么经过 get 方法访问下标为 9 的元素确定就会出问题了。
安全示例
所以为了保证线程安全,必须在方法调用端作额外的同步措施,以下面所示:
public class Test { static Vector<Integer> vector = new Vector<Integer>(); public static void main(String[] args) throws InterruptedException { while(true) { for(int i=0;i<10;i++) vector.add(i); Thread thread1 = new Thread(){ public void run() { synchronized (Test.class) { //进行额外的同步 for(int i=0;i<vector.size();i++) vector.remove(i); } }; }; Thread thread2 = new Thread(){ public void run() { synchronized (Test.class) { for(int i=0;i<vector.size();i++) vector.get(i); } }; }; thread1.start(); thread2.start(); while(Thread.activeCount()>10) { } } } }
ConcurrentModificationException 异常
在对 Vector 等容器并发地进行迭代修改时,会报 ConcurrentModificationException 异常,关于这个异常将会在后续文章中讲述。
可是在并发容器中不会出现这个问题。
JDK 的 java.util.concurrent
包(即 juc)中提供了几个很是有用的并发容器。
JDK7
ConcurrentHashMap 类在 jdk1.7 中的设计,其基本结构如图所示:
每个 segment 都是一个 HashEntry<K,V>[] table, table 中的每个元素本质上都是一个 HashEntry 的单向队列。好比 table[3]为首节点,table[3]->next 为节点 1,以后为节点 2,依次类推。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { // 将整个hashmap分红几个小的map,每一个segment都是一个锁;与hashtable相比,这么设计的目的是对于put, remove等操做,能够减小并发冲突,对 // 不属于同一个片断的节点能够并发操做,大大提升了性能 final Segment<K,V>[] segments; // 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了ReentrantLock, 能够做为互拆锁使用 static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; } // 基本节点,存储Key, Value值 static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; } }
JDK8
transient volatile HashEntry<K,V>[] table
保存数据,采用 table 数组元素做为锁,从而实现了对每一行数据进行加锁,进一步减小并发冲突的几率。final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 若是table为空,初始化;不然,根据hash值计算获得数组索引i,若是tab[i]为空,直接新建节点Node便可。注:tab[i]实质为链表或者红黑树的首节点。 if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 若是tab[i]不为空而且hash值为MOVED,说明该链表正在进行transfer操做,返回扩容完成后的table。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 针对首个节点进行加锁操做,而不是segment,进一步减小线程冲突 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 若是在链表中找到值为key的节点e,直接设置e.val = value便可。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 若是没有找到值为key的节点,直接新建Node并加入链表便可。 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 若是首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操做。 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 若是节点数>=8,那么转换链表结构为红黑树结构。 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 计数增长1,有可能触发transfer操做(扩容)。 addCount(1L, binCount); return null; }
public class ConcurrentHashMapDemo { public static void main(String[] args) throws InterruptedException { // HashMap 在并发迭代访问时会抛出 ConcurrentModificationException 异常 // Map<Integer, Character> map = new HashMap<>(); Map<Integer, Character> map = new ConcurrentHashMap<>(); Thread wthread = new Thread(() -> { System.out.println("写操做线程开始执行"); for (int i = 0; i < 26; i++) { map.put(i, (char) ('a' + i)); } }); Thread rthread = new Thread(() -> { System.out.println("读操做线程开始执行"); for (Integer key : map.keySet()) { System.out.println(key + " - " + map.get(key)); } }); wthread.start(); rthread.start(); Thread.sleep(1000); } }
重要属性
/** The lock protecting all mutators */ final transient ReentrantLock lock = new ReentrantLock(); /** The array, accessed only via getArray/setArray. */ private transient volatile Object[] array;
重要方法
public boolean add(E e) { //ReentrantLock加锁,保证线程安全 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //拷贝原容器,长度为原容器长度加一 Object[] newElements = Arrays.copyOf(elements, len + 1); //在新副本上执行添加操做 newElements[len] = e; //将原容器引用指向新副本 setArray(newElements); return true; } finally { //解锁 lock.unlock(); } }
public E remove(int index) { //加锁 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1; if (numMoved == 0) //若是要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用 setArray(Arrays.copyOf(elements, len - 1)); else { //不然,将除要删除元素以外的其余元素拷贝到新副本中,并切换引用 Object[] newElements = new Object[len - 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index + 1, newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { //解锁 lock.unlock(); } }
public E get(int index) { return get(getArray(), index); } private E get(Object[] a, int index) { return (E) a[index]; }
public class CopyOnWriteArrayListDemo { static class ReadTask implements Runnable { List<String> list; ReadTask(List<String> list) { this.list = list; } public void run() { for (String str : list) { System.out.println(str); } } } static class WriteTask implements Runnable { List<String> list; int index; WriteTask(List<String> list, int index) { this.list = list; this.index = index; } public void run() { list.remove(index); list.add(index, "write_" + index); } } public void run() { final int NUM = 10; // ArrayList 在并发迭代访问时会抛出 ConcurrentModificationException 异常 // List<String> list = new ArrayList<>(); CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); for (int i = 0; i < NUM; i++) { list.add("main_" + i); } ExecutorService executorService = Executors.newFixedThreadPool(NUM); for (int i = 0; i < NUM; i++) { executorService.execute(new ReadTask(list)); executorService.execute(new WriteTask(list, i)); } executorService.shutdown(); } public static void main(String[] args) { new CopyOnWriteArrayListDemo().run(); } }
免费Java资料须要本身领取,涵盖了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高并发分布式等教程。
传送门:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q