可能有些同窗知道ArrayList,HashSet,,HashMap这些容器都是线程不安全的,若是多个线程并发的访问这些容器就会致使线程不安全问题,不少时候须要咱们手动对这些容器进行同步处理,形成咱们很大的不便,所以java为咱们提供了同步容器和并发容器来解决这个问题。java
首先详细介绍前,须要强调下同步容器是线程安全的类,可是也可能形成线程不安全的问题,缘由在后面有解释。数组
同步容器的原理很简单,就是在原容器的基础上加了synchronize的锁,来保证同一时间只有一个线程来访问。安全
同步容器总的能够分为两类:并发
Vector案例一(线程安全)性能
@Slf4j @ThreadSafe public class VectorExample1 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static List<Integer> list = new Vector<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } private static void update(int i) { list.add(i); } }
Vector案例二(线程不安全)ui
@NotThreadSafe public class VectorExample2 { private static Vector<Integer> vector = new Vector<>(); public static void main(String[] args) { while (true) { for (int i = 0; i < 10; i++) { vector.add(i); } Thread thread1 = new Thread() { public void run() { for (int i = 0; i < vector.size(); i++) { vector.remove(i); } } }; Thread thread2 = new Thread() { public void run() { for (int i = 0; i < vector.size(); i++) { vector.get(i); } } }; thread1.start(); thread2.start(); } } }
我在上面的代码标题上已经提早说明这是个线程不安全的类了,为何同步容器的Vector也多是线程不安全的呢。你们能够实际运行下上面的类,应该会报数组越界的错误。spa
这里我解释下,Vector虽然能保证同一个时刻只有一个线程在访问它,以上面的代码为例,当咱们的线程2运行到get(i)的时候,线程1恰好把这个数据移除,这个时候就会出现问题。因此同步容器由于操做顺序的缘由,可能会产生线程不安全的问题。线程
Vector案例二code
public class VectorExample3 { // java.util.ConcurrentModificationException private static void test1(Vector<Integer> v1) { // foreach for(Integer i : v1) { if (i.equals(3)) { v1.remove(i); } } } // java.util.ConcurrentModificationException private static void test2(Vector<Integer> v1) { // iterator Iterator<Integer> iterator = v1.iterator(); while (iterator.hasNext()) { Integer i = iterator.next(); if (i.equals(3)) { v1.remove(i); } } } // success private static void test3(Vector<Integer> v1) { // for for (int i = 0; i < v1.size(); i++) { if (v1.get(i).equals(3)) { v1.remove(i); } } } public static void main(String[] args) { Vector<Integer> vector = new Vector<>(); vector.add(1); vector.add(2); vector.add(3); test1(vector); } }
结果:前两种test方法均会抛出溢常,第三种正常,你们在用foreach和iterator的时候不要对容器的数据进行移除操做,由于这两种方法会对容器的大小和预期的值进行校验。同理ArrayList等也会产生这样的问题的。这个东西对于我实在是印象深入,由于不知道这个问题,闹出来不少毛病。blog
Collections案例一
@Slf4j @ThreadSafe public class CollectionsExample1 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList()); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } private static void update(int i) { list.add(i); } }
Collections案例二
@Slf4j @ThreadSafe public class CollectionsExample2 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet()); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } }
Collections案例三
@Slf4j @ThreadSafe public class CollectionsExample3 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>()); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } private static void update(int i) { map.put(i, i); } }
由上面三个案例能够看出Collections.synchronizedXXX生成的三个同步容器类获得的值和预期的结果是相同的,因此是安全的。
总结:同步容器保证了同一时刻只有一个线程在访问,可是由于操做的缘由,仍是会产生线程不安全的问题,这个时候咱们可使用synchronize或者Lock来对相关代码块进行加锁操做,可是这种状况下又致使性能比较低下,又有什么好的解决办法呢。答案就在下面要介绍的并发容器了,实际项目中,同步容器已经不多使用,更多的仍是被并发容器所取代了。
ArrayList >>CopyOnWriteArrayList
CopyOnWriteArrayList 有几个缺点:
一、因为写操做的时候,须要拷贝数组,会消耗内存,若是原数组的内容比较多的状况下,可能致使young gc或者full gc
二、不能用于实时读的场景,像拷贝数组、新增元素都须要时间,因此调用一个set操做后,读取到数据可能仍是旧的,虽然CopyOnWriteArrayList 能作到最终一致性,可是仍是无法知足实时性要求;
CopyOnWriteArrayList 合适读多写少的场景,不过这类慎用
由于谁也无法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要从新复制数组,这个代价实在过高昂了。在高性能的互联网应用中,这种操做分分钟引发故障。
CopyOnWriteArrayList透露的思想
如上面的分析CopyOnWriteArrayList表达的一些思想:
一、读写分离,读和写分开
二、最终一致性
三、使用另外开辟空间的思路,来解决并发冲突
@Slf4j @ThreadSafe public class CopyOnWriteArrayListExample { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static List<Integer> list = new CopyOnWriteArrayList<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } private static void update(int i) { list.add(i); } }
经过结果可知是线程安全的。
HashSet、TreeSet>>CopyOnWriteArraySet、ConcurrentSkipListSet
CopyOnWriteArraySet的底层的实现是CopyOnWriteArrayList,所以它的特色和CopyOnWriteArrayList相似
@Slf4j @ThreadSafe public class CopyOnWriteArraySetExample { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Set<Integer> set = new CopyOnWriteArraySet<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } }
ConcurrentSkipListSet是JDK6新增的类,ConcurrentSkipListSet基于map集合,须要注意在此类的批量操做的方法不保证原子性,可是保证底层每次调用的原子性。因此在批量操做时须要另外完成同步操做。
@Slf4j @ThreadSafe public class ConcurrentSkipListSetExample { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Set<Integer> set = new ConcurrentSkipListSet<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } }
HashMap、TreeMap>>ConcurrentHashMap、ConcurrentSkipListMap
ConcurrentHashMap
@Slf4j @ThreadSafe public class ConcurrentHashMapExample { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Map<Integer, Integer> map = new ConcurrentHashMap<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } private static void update(int i) { map.put(i, i); } }
ConcurrentSkipListMap
@Slf4j @ThreadSafe public class ConcurrentSkipListMapExample { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } private static void update(int i) { map.put(i, i); } }