并发包学习(二)-容器学习记录

可能有些同窗知道ArrayList,HashSet,,HashMap这些容器都是线程不安全的,若是多个线程并发的访问这些容器就会致使线程不安全问题,不少时候须要咱们手动对这些容器进行同步处理,形成咱们很大的不便,所以java为咱们提供了同步容器和并发容器来解决这个问题。java

1、同步容器

首先详细介绍前,须要强调下同步容器是线程安全的类,可是也可能形成线程不安全的问题,缘由在后面有解释。数组

同步容器的原理很简单,就是在原容器的基础上加了synchronize的锁,来保证同一时间只有一个线程来访问。安全

同步容器总的能够分为两类:并发

  • java提供好的线程的类
  1. ArrayList >>Vector,Stack
  2. HashMap>>HashTable
  • Collections.synchronizedXXX提供的静态工厂方法建立的类
  1. Collections.synchronizedCollection(Collection<T>t)
  2. Collections.synchronizedList(List<T>list)
  3. Collections.synchronizedMap(Map<K, V>map)
  4. Collections.synchronizedSet(Set<T> t)

Vector案例一(线程安全)性能

@Slf4j
@ThreadSafe
public class VectorExample1 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static List<Integer> list = new Vector<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }

    private static void update(int i) {
        list.add(i);
    }
}

Vector案例二(线程不安全)ui

@NotThreadSafe
public class VectorExample2 {

    private static Vector<Integer> vector = new Vector<>();

    public static void main(String[] args) {

        while (true) {

            for (int i = 0; i < 10; i++) {
                vector.add(i);
            }
        
            Thread thread1 = new Thread() {
                public void run() {
                    for (int i = 0; i < vector.size(); i++) {
                        vector.remove(i);
                    }
                }
            };

            Thread thread2 = new Thread() {
                public void run() {
                    for (int i = 0; i < vector.size(); i++) {
                        vector.get(i);
                    }
                }
            };
            thread1.start();
            thread2.start();
        }
    }
}

我在上面的代码标题上已经提早说明这是个线程不安全的类了,为何同步容器的Vector也多是线程不安全的呢。你们能够实际运行下上面的类,应该会报数组越界的错误。spa

这里我解释下,Vector虽然能保证同一个时刻只有一个线程在访问它,以上面的代码为例,当咱们的线程2运行到get(i)的时候,线程1恰好把这个数据移除,这个时候就会出现问题。因此同步容器由于操做顺序的缘由,可能会产生线程不安全的问题。线程

Vector案例二code

public class VectorExample3 {

    // java.util.ConcurrentModificationException
    private static void test1(Vector<Integer> v1) { // foreach
        for(Integer i : v1) {
            if (i.equals(3)) {
                v1.remove(i);
            }
        }
    }

    // java.util.ConcurrentModificationException
    private static void test2(Vector<Integer> v1) { // iterator
        Iterator<Integer> iterator = v1.iterator();
        while (iterator.hasNext()) {
            Integer i = iterator.next();
            if (i.equals(3)) {
                v1.remove(i);
            }
        }
    }

    // success
    private static void test3(Vector<Integer> v1) { // for
        for (int i = 0; i < v1.size(); i++) {
            if (v1.get(i).equals(3)) {
                v1.remove(i);
            }
        }
    }

    public static void main(String[] args) {

        Vector<Integer> vector = new Vector<>();
        vector.add(1);
        vector.add(2);
        vector.add(3);
        test1(vector);
    }
}

结果:前两种test方法均会抛出溢常,第三种正常,你们在用foreach和iterator的时候不要对容器的数据进行移除操做,由于这两种方法会对容器的大小和预期的值进行校验。同理ArrayList等也会产生这样的问题的。这个东西对于我实在是印象深入,由于不知道这个问题,闹出来不少毛病。blog

Collections案例一

@Slf4j
@ThreadSafe
public class CollectionsExample1 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }

    private static void update(int i) {
        list.add(i);
    }
}

Collections案例二

@Slf4j
@ThreadSafe
public class CollectionsExample2 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());
    }

    private static void update(int i) {
        set.add(i);
    }
}

Collections案例三

@Slf4j
@ThreadSafe
public class CollectionsExample3 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());
    }

    private static void update(int i) {
        map.put(i, i);
    }
}

由上面三个案例能够看出Collections.synchronizedXXX生成的三个同步容器类获得的值和预期的结果是相同的,因此是安全的。

总结:同步容器保证了同一时刻只有一个线程在访问,可是由于操做的缘由,仍是会产生线程不安全的问题,这个时候咱们可使用synchronize或者Lock来对相关代码块进行加锁操做,可是这种状况下又致使性能比较低下,又有什么好的解决办法呢。答案就在下面要介绍的并发容器了,实际项目中,同步容器已经不多使用,更多的仍是被并发容器所取代了。

 2、并发容器

ArrayList >>CopyOnWriteArrayList

CopyOnWriteArrayList 有几个缺点:
一、因为写操做的时候,须要拷贝数组,会消耗内存,若是原数组的内容比较多的状况下,可能致使young gc或者full gc
二、不能用于实时读的场景,像拷贝数组、新增元素都须要时间,因此调用一个set操做后,读取到数据可能仍是旧的,虽然CopyOnWriteArrayList 能作到最终一致性,可是仍是无法知足实时性要求;
CopyOnWriteArrayList 合适读多写少的场景,不过这类慎用
由于谁也无法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要从新复制数组,这个代价实在过高昂了。在高性能的互联网应用中,这种操做分分钟引发故障。

CopyOnWriteArrayList透露的思想
如上面的分析CopyOnWriteArrayList表达的一些思想:
一、读写分离,读和写分开
二、最终一致性
三、使用另外开辟空间的思路,来解决并发冲突

@Slf4j
@ThreadSafe
public class CopyOnWriteArrayListExample {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static List<Integer> list = new CopyOnWriteArrayList<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }

    private static void update(int i) {
        list.add(i);
    }
}

经过结果可知是线程安全的。

HashSet、TreeSet>>CopyOnWriteArraySet、ConcurrentSkipListSet

CopyOnWriteArraySet的底层的实现是CopyOnWriteArrayList,所以它的特色和CopyOnWriteArrayList相似

  • 它最适合于具备如下特征的应用程序:set 大小一般保持很小,只读操做远多于可变操做,须要在遍历期间防止线程间的冲突。
  • 它是线程安全的, 底层的实现是CopyOnWriteArrayList;
  • 由于一般须要复制整个基础数组,因此可变操做(add、set 和 remove 等等)的开销很大。
  • 迭代器不支持可变 remove 操做。
  • 使用迭代器进行遍历的速度很快,而且不会与其余线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。
@Slf4j
@ThreadSafe
public class CopyOnWriteArraySetExample {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Set<Integer> set = new CopyOnWriteArraySet<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());
    }

    private static void update(int i) {
        set.add(i);
    }
}

ConcurrentSkipListSet是JDK6新增的类,ConcurrentSkipListSet基于map集合,须要注意在此类的批量操做的方法不保证原子性,可是保证底层每次调用的原子性。因此在批量操做时须要另外完成同步操做。

@Slf4j
@ThreadSafe
public class ConcurrentSkipListSetExample {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Set<Integer> set = new ConcurrentSkipListSet<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());
    }

    private static void update(int i) {
        set.add(i);
    }
}

HashMap、TreeMap>>ConcurrentHashMap、ConcurrentSkipListMap

ConcurrentHashMap

@Slf4j
@ThreadSafe
public class ConcurrentHashMapExample {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Map<Integer, Integer> map = new ConcurrentHashMap<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());
    }

    private static void update(int i) {
        map.put(i, i);
    }
}

ConcurrentSkipListMap

@Slf4j
@ThreadSafe
public class ConcurrentSkipListMapExample {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());
    }

    private static void update(int i) {
        map.put(i, i);
    }
}
相关文章
相关标签/搜索