parallelStream是什么,它是一个集合的并发处理流.其做用是把一个集合中的数据分片,进行一个多线程的处理,增快运行速度.java
好比说这样一段代码安全
private Set<SysRole> sysRoles; private Set<String> permission; @Override public Collection<? extends GrantedAuthority> getAuthorities() { Collection<GrantedAuthority> collection = Collections.synchronizedSet(new HashSet<>()); if (!CollectionUtils.isEmpty(sysRoles)) { sysRoles.parallelStream().forEach(role -> { if (role.getCode().startsWith("ROLE_")) { collection.add(new SimpleGrantedAuthority(role.getCode())); }else { collection.add(new SimpleGrantedAuthority("ROLE_" + role.getCode())); } }); } return collection; }
它就是以不一样的线程来给collection添加SimpleGrantedAuthority的,请注意collection的线程安全性.多线程
固然咱们能够用下面这个例子来证实parallelStream的确是多线程处理并发
public class App { public static void main(String[] args) throws Exception { System.out.println("Hello World!"); // 构造一个10000个元素的集合 List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) { list.add(i); } // 统计并行执行list的线程 Set<Thread> threadSet = new CopyOnWriteArraySet<>(); // 并行执行 list.parallelStream().forEach(integer -> { Thread thread = Thread.currentThread(); // System.out.println(thread); // 统计并行执行list的线程 threadSet.add(thread); }); System.out.println("threadSet一共有" + threadSet.size() + "个线程"); System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu"); List<Integer> list1 = new ArrayList<>(); List<Integer> list2 = new ArrayList<>(); for (int i = 0; i < 100000; i++) { list1.add(i); list2.add(i); } Set<Thread> threadSetTwo = new CopyOnWriteArraySet<>(); CountDownLatch countDownLatch = new CountDownLatch(2); Thread threadA = new Thread(() -> { list1.parallelStream().forEach(integer -> { Thread thread = Thread.currentThread(); // System.out.println("list1" + thread); threadSetTwo.add(thread); }); countDownLatch.countDown(); }); Thread threadB = new Thread(() -> { list2.parallelStream().forEach(integer -> { Thread thread = Thread.currentThread(); // System.out.println("list2" + thread); threadSetTwo.add(thread); }); countDownLatch.countDown(); }); threadA.start(); threadB.start(); countDownLatch.await(); System.out.print("threadSetTwo一共有" + threadSetTwo.size() + "个线程"); System.out.println("---------------------------"); System.out.println(threadSet); System.out.println(threadSetTwo); System.out.println("---------------------------"); threadSetTwo.addAll(threadSet); System.out.println(threadSetTwo); System.out.println("threadSetTwo一共有" + threadSetTwo.size() + "个线程"); System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu"); } }
运行结果以下ide
Hello World!
threadSet一共有3个线程
系统一个有4个cpu
threadSetTwo一共有5个线程---------------------------
[Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-1,5,main]]
[Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,]]
---------------------------
[Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,], Thread[main,5,main]]
threadSetTwo一共有6个线程
系统一个有4个cpu线程
咱们能够看到threadSet一共有3个线程,证实get
Set<Thread> threadSet = new CopyOnWriteArraySet<>(); // 并行执行 list.parallelStream().forEach(integer -> { Thread thread = Thread.currentThread(); // System.out.println(thread); // 统计并行执行list的线程 threadSet.add(thread); }); System.out.println("threadSet一共有" + threadSet.size() + "个线程");
是3个线程处理的,另外CopyOnWriteArraySet是线程安全的.后面是由显示线程调用,主线程等待的方式.it
调节parallelStream的并发线程数能够用参数-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量)io