学习笔记六:线程间的协做与通讯之并发工具类

继续 上一篇 《 学习笔记五:线程间的协做与通讯java

在jdk的并发包里提供了几个很是有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类提供了在线程间交换数据的一种手段,以下主要介绍工具类的使用。数据库

一、等待多线程完成的CountDownLatch

CountDownLatch容许一个或多个线程等待其余线程完成操做。多线程

CountDownLatch类其实是使用计数器的方式去控制的,当咱们初始化CountDownLatch的时候传入了一个int变量这个时候在类的内部初始化一个int的变量,每当咱们调用countDownt()方法的时候就使得这个变量的值减1,而对于await()方法则去判断这个int的变量的值是否为0,是则表示全部的操做都已经完成,不然继续等待。并发

构造器中的计数值(count)实际上就是闭锁须要等待的线程数量。计数器的值必须大于等于0,只是等于0的时候,计数器就是零,调用await方法时不会阻塞当前线程;这个值只能被设置一次,并且CountDownLatch没有提供任何机制去从新设置这个计数值;一个线程调用countDown方法happen-before,另一个线程调用await方法。app

与CountDownLatch的第一次交互是主线程等待其余线程。主线程必须在启动其余线程后当即调用CountDownLatch.await()方法。这样主线程的操做就会在这个方法上阻塞,直到其余线程完成各自的任务。ide

示例:boss等待全部员工来开会,当全部人员都到齐以后,boss宣布开始会议!!!函数

package com.black.example.mutilThread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Created by 10250H on 2018/7/26.
 */
public class CountDownLatchDemo {
    //声明countDownLatch 变量,初始化线程数量(内部计数器)
    static CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) {
        new Thread(new MyRunner(countDownLatch, "小李", 2000)).start();
        new Thread(new MyRunner(countDownLatch, "小张", 4000)).start();
        new Thread(new MyRunner(countDownLatch, "小王", 5000)).start();

        try {
            System.out.println("等待员工到来开会。。。。。。。");
            //注意这里是await。主线程将会一直等待在这里,当全部线程都执行 countDownLatch.countDown();以后当前线程才会继续执行
            countDownLatch.await();
            startMeeting("老板");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void startMeeting(String name) {
        System.out.println(name + "说:人齐了。会议开始!!");
    }

    static class MyRunner implements Runnable {
        CountDownLatch countDownLatch;
        String name;
        int time;

        public MyRunner(CountDownLatch countDownLatch, String name, int time) {
            this.countDownLatch = countDownLatch;
            this.name = name;
            this.time = time;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " 开始出发去公司。");
                TimeUnit.SECONDS.sleep(1);
                System.out.println(name + " 终于到会议室!!!");
                countDownLatch.countDown();
                System.out.println(name + " 准备好了!!");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

}

运行结果:注意,至因而谁先到会议室,每次运行结果都会不同。由于主线程和子线程的调用时由CPU决定的工具

若是某我的缺席会议,咱们不能让主线程一直等待,因此可使用另一个带指定时间的await方法-await(long time,TimeUtil unit)的那个带指定时间后,就好再也不阻塞当前线程。jion也有相似的方法。学习

二、同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier),它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续运行。ui

CyclicBarrier默认构造方法时CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达屏障了,而后当前线程被阻塞。   示例代码以下:

package com.black.example.mutilThread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * Created by 10250H on 2018/7/26.
 */
public class CyclicBarrierDemo1 {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new MyThread(1)).start();
        new Thread(new MyThread(2)).start();
    }

    static class MyThread implements Runnable{
        private int counter;
        public MyThread(int counter){
            this.counter = counter;
        }
        @Override
        public void run() {
            try {
                System.out.println("当前值输出:"+counter);
                TimeUnit.SECONDS.sleep(1);
                cyclicBarrier.await();
                System.out.println("counter="+counter+",线程="+Thread.currentThread().getName()+",继续执行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }
}

运行结果:由于主线程和子线程的调用时由CPU决定的,两个线程都有可能先执行,因此会产生不一样的结果

若是把new CyclicBarrier(2) 修改成new CyclicBarrier(3),则主线程和子线程会永远等待,不会继续执行,由于第三个尚未到达屏障,因此以前到达屏障的两个线程都不会继续执行。

CyclicBarrier还提供了高级构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在现场到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,示例代码以下:

package com.black.example.mutilThread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo2 {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,new MyThread(3));

    public static void main(String[] args){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("通用计数当前值:1");
                    cyclicBarrier.await();
                    System.out.println("默认执行通用线程2");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("通用计数当前值:2");
                    cyclicBarrier.await();
                    System.out.println("默认执行通用线程2");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    static class MyThread implements Runnable{
        private int counter;
        public MyThread(int counter){
            this.counter = counter;
        }
        @Override
        public void run() {
            try {
                System.out.println("优先执行:"+counter);
                TimeUnit.SECONDS.sleep(1);
                System.out.println("counter="+counter+",线程="+Thread.currentThread().getName()+",继续执行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

运行结果:当两个线程都到达屏障后,优先执行对象MyThread 的任务。

CyclicBarrier的应用场景

能够用于多线程计算数据,最后合并计算结果的场景。例如:用一个Excel保存用户的全部银行流水,每一个Sheet保存一个帐户近一年的每笔银行流水,如今须要统计用户的日均银行流水,先用多线程处理每一个sheet里的银行流水,都执行完以后,获得每一个sheet的日均银行流水。最后再用barrierAction总结出整个Excel的日均银行流水,示例代码以下:

package com.black.example.mutilThread;

import java.util.Map;
import java.util.concurrent.*;

public class BankWaterService implements Runnable {
    //建立4个屏障,处理完以后执行当前类的run方法
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this);
    //假设只有4个sheet,因此启动4个线程
    private Executor executor = Executors.newFixedThreadPool(4);
    //保存每一个sheet计算出的银行流水结果
    private ConcurrentHashMap<String,Integer> sheetCountMap = new ConcurrentHashMap<String,Integer>();

    private void count(){
        for (int i=0;i<4;i++){
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    //1:计算当前sheet的银行流水数据,计算代码省略....伪代码以下:
                    sheetCountMap.put(Thread.currentThread().getName(),1);
                    //计算完成,插入屏障
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    @Override
    public void run() {
        int result=0;
        //汇总每一个sheet的计算结果
        for (Map.Entry<String,Integer> entry:sheetCountMap.entrySet()){
            result+=entry.getValue();
        }
        //输出结果
        sheetCountMap.put("result",result);
        System.out.println("最终结果:"+result);
    }

    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();
    }
}

运行结果:最终结果:4

 三、CyclicBarrier与CountDownLatch的区别

CountDownLatch计数器只能使用一次,而CyclicBarrier计数器可使用reset()方法重置。

因此CyclicBarrier能够处理更为复杂的业务场景,如:计算发生错误,能够重置计数器,并让线程从新执行一次。

CyclicBarrier其余用法及源代码以下,如:

    isBroken() :阻塞的线程是否被中断,返回值boolean类型

    getNumberWaiting() :获取Cyclic-Barrier阻塞的线程数量

 四、控制并发线程数的Semaphore

Semaphore(信号量)用来控制同时访问特定资源的线程数量,经过协调各个线程,以保证合理的使用公共资源。

应用场景

Semaphore能够用于流量控制,特别是公用资源有限的应用场景,好比数据库连接。

假若有个需求,要读几万个文件的数据,由于是IO密集型任务,咱们能够启动几十个线程并发读取,可是若是读到内存后,还须要存储到数据库中,而数据库的链接数只有10个,这时咱们必须控制只有10个线程同时获取数据库连接保存数据,不然会报错没法获取数据库连接。这时可使用Semaphore来作流量控制,代码示例以下:

package com.black.example.mutilThread;

import java.util.concurrent.*;

public class SemaphoreDemo {
    private static final int THREAD_COUNT=30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore semaphore = new Semaphore(10);//10个并发

    public static void main(String[] args) {
        for (int i=0;i<THREAD_COUNT;i++){
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName()+"-----请求资源");
                        //请求获取资源,若是有空闲资源则会当即获取,进入临界区,不然将会等待,一直等待到获取到临界区资源
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+"----获取资源,保存数据!");
                        TimeUnit.SECONDS.sleep(1);
                        semaphore.release();//释放资源
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

}

运行结果:

pool-1-thread-4-----请求资源
pool-1-thread-2-----请求资源
pool-1-thread-4----获取资源,保存数据!
pool-1-thread-1-----请求资源
pool-1-thread-5-----请求资源
pool-1-thread-3-----请求资源
pool-1-thread-5----获取资源,保存数据!
pool-1-thread-8-----请求资源
pool-1-thread-1----获取资源,保存数据!
pool-1-thread-9-----请求资源
pool-1-thread-2----获取资源,保存数据!
pool-1-thread-7-----请求资源
pool-1-thread-9----获取资源,保存数据!
pool-1-thread-10-----请求资源
pool-1-thread-11-----请求资源
pool-1-thread-8----获取资源,保存数据!
pool-1-thread-6-----请求资源
pool-1-thread-3----获取资源,保存数据!
pool-1-thread-13-----请求资源
pool-1-thread-11----获取资源,保存数据!
pool-1-thread-10----获取资源,保存数据!
pool-1-thread-12-----请求资源
pool-1-thread-7----获取资源,保存数据!
pool-1-thread-15-----请求资源
pool-1-thread-14-----请求资源
pool-1-thread-16-----请求资源
pool-1-thread-17-----请求资源
pool-1-thread-18-----请求资源
pool-1-thread-19-----请求资源
pool-1-thread-21-----请求资源
pool-1-thread-20-----请求资源
pool-1-thread-22-----请求资源
pool-1-thread-23-----请求资源
pool-1-thread-24-----请求资源
pool-1-thread-25-----请求资源
pool-1-thread-27-----请求资源
pool-1-thread-26-----请求资源
pool-1-thread-28-----请求资源
pool-1-thread-29-----请求资源
pool-1-thread-30-----请求资源
pool-1-thread-6----获取资源,保存数据!
pool-1-thread-13----获取资源,保存数据!
pool-1-thread-12----获取资源,保存数据!
pool-1-thread-15----获取资源,保存数据!
pool-1-thread-16----获取资源,保存数据!
pool-1-thread-17----获取资源,保存数据!
pool-1-thread-14----获取资源,保存数据!
pool-1-thread-21----获取资源,保存数据!
pool-1-thread-19----获取资源,保存数据!
pool-1-thread-18----获取资源,保存数据!
pool-1-thread-20----获取资源,保存数据!
pool-1-thread-22----获取资源,保存数据!
pool-1-thread-23----获取资源,保存数据!
pool-1-thread-26----获取资源,保存数据!
pool-1-thread-27----获取资源,保存数据!
pool-1-thread-25----获取资源,保存数据!
pool-1-thread-24----获取资源,保存数据!
pool-1-thread-30----获取资源,保存数据!
pool-1-thread-29----获取资源,保存数据!
pool-1-thread-28----获取资源,保存数据!

Semaphore简单用法以下:

//构造方法,可用的许可证数量,默认使用非公平锁的方式建立
public Semaphore(int permits){...}
//尝试获取许可证
public boolean tryAcquire(){...}
//尝试获取许可证,在指定时间内若获取不到则返回
public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException{...}
//返回信号量中当前可用的许可证数
public int availablePermits(){...}
//返回正在等待获取许可证的线程数(估计值)
public final int getQueueLength() {...}
//查询是否有任何线程等待获取许可证
public final boolean hasQueuedThreads(){...}
//减小reduction个许可证,这个方法在使用的子类中颇有用跟踪不可用资源的信号量
protected void reducePermits(int reduction) {...}
//返回全部等待获取许可证的线程集合
protected Collection<Thread> getQueuedThreads(){...}

 

上一篇:学习笔记五:线程间的协做与通讯 

下一篇:待续....

相关文章
相关标签/搜索