浅谈parallelStream

parallelStream是什么,它是一个集合的并发处理流.其做用是把一个集合中的数据分片,进行一个多线程的处理,增快运行速度.java

好比说这样一段代码安全

private Set<SysRole> sysRoles;
private Set<String> permission;

@Override
public Collection<? extends GrantedAuthority> getAuthorities() {
    Collection<GrantedAuthority> collection = Collections.synchronizedSet(new HashSet<>());
    if (!CollectionUtils.isEmpty(sysRoles)) {
        sysRoles.parallelStream().forEach(role -> {
            if (role.getCode().startsWith("ROLE_")) {
                collection.add(new SimpleGrantedAuthority(role.getCode()));
            }else {
                collection.add(new SimpleGrantedAuthority("ROLE_" + role.getCode()));
            }
        });
    }
    return collection;
}

它就是以不一样的线程来给collection添加SimpleGrantedAuthority的,请注意collection的线程安全性.多线程

固然咱们能够用下面这个例子来证实parallelStream的确是多线程处理并发

public class App {
    public static void main(String[] args) throws Exception {
        System.out.println("Hello World!");
        // 构造一个10000个元素的集合
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            list.add(i);
        }
        // 统计并行执行list的线程
        Set<Thread> threadSet = new CopyOnWriteArraySet<>();
        // 并行执行
        list.parallelStream().forEach(integer -> {
            Thread thread = Thread.currentThread();
            // System.out.println(thread);
            // 统计并行执行list的线程
            threadSet.add(thread);
        });
        System.out.println("threadSet一共有" + threadSet.size() + "个线程");
        System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu");
        List<Integer> list1 = new ArrayList<>();
        List<Integer> list2 = new ArrayList<>();
        for (int i = 0; i < 100000; i++) {
            list1.add(i);
            list2.add(i);
        }
        Set<Thread> threadSetTwo = new CopyOnWriteArraySet<>();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Thread threadA = new Thread(() -> {
            list1.parallelStream().forEach(integer -> {
                Thread thread = Thread.currentThread();
                // System.out.println("list1" + thread);
                threadSetTwo.add(thread);
            });
            countDownLatch.countDown();
        });
        Thread threadB = new Thread(() -> {
            list2.parallelStream().forEach(integer -> {
                Thread thread = Thread.currentThread();
                // System.out.println("list2" + thread);
                threadSetTwo.add(thread);
            });
            countDownLatch.countDown();
        });

        threadA.start();
        threadB.start();
        countDownLatch.await();
        System.out.print("threadSetTwo一共有" + threadSetTwo.size() + "个线程");

        System.out.println("---------------------------");
        System.out.println(threadSet);
        System.out.println(threadSetTwo);
        System.out.println("---------------------------");
        threadSetTwo.addAll(threadSet);
        System.out.println(threadSetTwo);
        System.out.println("threadSetTwo一共有" + threadSetTwo.size() + "个线程");
        System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu");
    }
}

运行结果以下ide

Hello World!
threadSet一共有3个线程
系统一个有4个cpu
threadSetTwo一共有5个线程---------------------------
[Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-1,5,main]]
[Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,]]
---------------------------
[Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,], Thread[main,5,main]]
threadSetTwo一共有6个线程
系统一个有4个cpu线程

咱们能够看到threadSet一共有3个线程,证实get

Set<Thread> threadSet = new CopyOnWriteArraySet<>();
// 并行执行
list.parallelStream().forEach(integer -> {
    Thread thread = Thread.currentThread();
    // System.out.println(thread);
    // 统计并行执行list的线程
    threadSet.add(thread);
});
System.out.println("threadSet一共有" + threadSet.size() + "个线程");

是3个线程处理的,另外CopyOnWriteArraySet是线程安全的.后面是由显示线程调用,主线程等待的方式.it

调节parallelStream的并发线程数能够用参数-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量)io

相关文章
相关标签/搜索