Java多线程Semaphore和CountDownLatch

我的博客项目地址前端

但愿各位帮忙点个star,给我加个小星星✨java


最近忙着毕设,要作前端,因此看更多的是React的知识,小后端🐶仍是要继续学习总结,否则就要没(tou)时(lan)间继续写了。git

Semaphore

中文含义是信号量,它是synchronized的升级版。github

synchronized 关键字,表明这个方法加锁,至关于无论哪个线程(例如线程A),运行到这个方法时,都要检查有没有其它线程B(或者C、 D等)正在用这个方法(或者该类的其余同步方法),有的话要等正在使用synchronized方法的线程B(或者C 、D)运行完这个方法后再运行此线程A,没有的话,锁定调用者,而后直接运行。它包括两种用法:synchronized 方法和 synchronized 块。(度娘解释)后端

Semaphore主要的做用是控制线程的并发数,若是单纯的使用synchronized是不能实现的。并发

简单看

Semaphore.java
/** * A counting semaphore. Conceptually, a semaphore maintains a set of * permits. Each {@link #acquire} blocks if necessary until a permit is * available, and then takes it. Each {@link #release} adds a permit, * potentially releasing a blocking acquirer. * However, no actual permit objects are used; the {@code Semaphore} just * keeps a count of the number available and acts accordingly. * 简单的来说,信号量维持了一组许可证(permits),每次调用的时候,须要获取permit才能进行进行操做。 每一个线程调用semaphore.acquire()获取一个许可证后,就会减小许可证数量。 当没有许可证的时候,线程会阻塞,直到以前得到许可证的线程操做完释放才能进行。 复制代码

Semaphore结构

/** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee * first-in first-out granting of permits under contention, * else {@code false} */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
复制代码

两个构造方法: 参数permit是许可的意思,表明在同一时间内,最多容许多少个线程同时执行acquire()和release()之间的代码,Semaphore发放许可的操做是减法操做。socket

参数fair,表示内部使用的是FairSync(公平锁)或者NonfairSync(非公平锁),表示每次线程获取锁的机会是不是公平的,具体的能够看底层实现,继承自AbstractQueuedSynchronizer,经过state判断是不是加锁状态,state 为0,表示锁未被获取,不为0,表示已被获取。ide

如何用

第一种,同步的执行一个任务(与Synchronized类似)

//多个线程里,保持一份信号量
private Semaphore semaphore = new Semaphore(1);

//线程中调用
public void testMethod() {
        try {
            //获取锁
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() +
                    "begin Time" + System.currentTimeMillis());
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() +
                    "end Time" + System.currentTimeMillis());

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //释放锁
            semaphore.release();
        }
    }
复制代码

这种比较简单,就不深刻展现了,要看的是在多个线程下如何控制并发数量。工具

第二种,控制线程并发数量

首先是执行类学习

public class TestService {
    //许可证数量为2
    private Semaphore semaphore = new Semaphore(2);

    public void testMethod() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() +
                    "begin Time" + System.currentTimeMillis());
            //停顿5秒
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() +
                    "end Time" + System.currentTimeMillis());

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            semaphore.release();
        }
    }
}
复制代码

建立一个线程池,分配任务:

public static void main(String[] args) throws IOException {
        //建立线程池,本身新建一个ThreadFactory,定义线程名字
        ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new ThreadFactory() {

            @Override
            public Thread newThread(@NotNull Runnable r) {
                return new Thread(r, "当前线程哈希值是:" + r.hashCode());
            }
        });

        TestService testService = new TestService();
        //分派任务
        for (int i = 0; i < 10; i++){
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    testService.testMethod();
                }
            });
            poolExecutor.submit(thread);
        }
         //关闭线程池,等待池中的线程任务执行完毕
        poolExecutor.shutdown();
    }
复制代码

执行结果:

Connected to the target VM, address: '127.0.0.1:60937', transport: 'socket'
当前线程哈希值是:55909012begin Time1521281771135
当前线程哈希值是:922151033begin Time1521281771135
当前线程哈希值是:55909012end Time1521281776136
当前线程哈希值是:922151033end Time1521281776136
当前线程哈希值是:1915058446begin Time1521281776136
当前线程哈希值是:1387228415begin Time1521281776136
当前线程哈希值是:1915058446end Time1521281781140
当前线程哈希值是:1387228415end Time1521281781140
当前线程哈希值是:748658608begin Time1521281781140
当前线程哈希值是:167185492begin Time1521281781140
当前线程哈希值是:167185492end Time1521281786145
当前线程哈希值是:748658608end Time1521281786145
当前线程哈希值是:1937348256begin Time1521281786145
当前线程哈希值是:1358444045begin Time1521281786145
当前线程哈希值是:1937348256end Time1521281791148
当前线程哈希值是:1358444045end Time1521281791148
当前线程哈希值是:331844619begin Time1521281791148
当前线程哈希值是:64830413begin Time1521281791148
Disconnected from the target VM, address: '127.0.0.1:60937', transport: 'socket'
当前线程哈希值是:331844619end Time1521281796152
当前线程哈希值是:64830413end Time1521281796152

Process finished with exit code 0
复制代码

能够看到,在设定的线程睡眠5秒内,只有两个线程同时执行acquire()和release()之间的逻辑,经过Semaphore控制了线程的并发数量。

其它方法

  • acquire(int) : 一次获取多个许可
  • acquireUninterruptibly() : 使等待进入acquire()方法,不容许被终止
  • tryAcquire() : 尝试地得到一个许可,若是获取不到就返回false,一般与if判断使用,具备无阻塞的特色
  • tryAcquire(long timeout, TimeUnit unit) : 多少时间内获取不到许可就放弃

还有不少方法,诸如availablePermits()/drainPermits()/hasQueuedThreads()/getQueueLength()等,感兴趣的话请怒开IDE查看具体实现吧。

CountDownLatch

CountDownLatch也是一个工具类,可使线程同步的处理上更加灵活,CountDownLatch也是减法操做

简单介绍

/* A synchronization aid that allows one or more threads to wait until * a set of operations being performed in other threads completes. 一个同步辅助类,容许一个或多个线程等待,直到在其余线程中执行的一组操做完成。 使用效果是,给定一个计数,当使用这个CountDownLatch类的线程判断计数不为0的时候,线程处于wait状态,若是为0,就继续进行。 复制代码

举个🌰: 来了一辆小汽车,要等满5我的才开车,来了1个,不开;再来1个,仍是不开,最后5我的到齐了,开车开车。

类结构&构造方法

//这货也是继承AbstractQueuedSynchronizer
    private final Sync sync;

    /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
复制代码

参数count表示要等待的数量

方法示范

执行类,等待5秒后执行countDown

public class TestService {

    private Semaphore semaphore = new Semaphore(1);

    private CountDownLatch countDownLatch;


    public TestService(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public void testMethod() {
        try {
            semaphore.acquire();
            System.out.println("当前线程是: " + Thread.currentThread().getName() +
                    " 时间是: " + System.currentTimeMillis());
            //等待5秒
            Thread.sleep(1000);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            countDownLatch.countDown();
            semaphore.release();
        }
    }
}
复制代码

运行类

public static void main(String[] args) throws IOException, InterruptedException {

        ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new ThreadFactory() {

            @Override
            public Thread newThread(@NotNull Runnable r) {
                return new Thread(r, "当前线程哈希值是:" + r.hashCode());
            }
        });

        //设定十个限制
        CountDownLatch countDownLatch = new CountDownLatch(10);
        TestService testService = new TestService(countDownLatch);

        for (int i = 0; i < 10; i++){
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    testService.testMethod();
                }
            });
            thread.setName("" + i);
            poolExecutor.submit(thread);
        }
        //关闭线程池,等待池中的线程任务执行完毕
        poolExecutor.shutdown();
        System.out.println("poolExecutor分发任务结束: " + System.currentTimeMillis());
        countDownLatch.await();
        System.out.println("CountDown方法结束: " + System.currentTimeMillis());
    }
复制代码

执行日志:

当前线程是: 当前线程哈希值是:922151033 时间是: 1521287832750
poolExecutor分发任务结束: 1521287832752
当前线程是: 当前线程哈希值是:55909012 时间是: 1521287833755
当前线程是: 当前线程哈希值是:1387228415 时间是: 1521287834759
当前线程是: 当前线程哈希值是:748658608 时间是: 1521287835763
当前线程是: 当前线程哈希值是:167185492 时间是: 1521287836765
当前线程是: 当前线程哈希值是:1937348256 时间是: 1521287837769
当前线程是: 当前线程哈希值是:1358444045 时间是: 1521287838770
当前线程是: 当前线程哈希值是:331844619 时间是: 1521287839774
当前线程是: 当前线程哈希值是:64830413 时间是: 1521287840776
当前线程是: 当前线程哈希值是:653687670 时间是: 1521287841779
CountDown方法结束: 1521287842784

复制代码

能够看到,在线程池控制1个并发线程,poolExecutor提交任务以后打印日志,可是countDownLatch.await()方法以后的代码,由于count没有减到0,不能执行。

在TestService方法中,每隔一秒执行countDownLatch.countDown()方法,最后十个线程跑完,count减到0,countDownLatch.await()方法以后的代码才能够执行。

方法

  • await() : 等待
  • countDown() : 计数减一
  • await(long timeout, TimeUnit unit) : 在限定时间内进行等待,超过期间返回false
  • getCount() : 获取计数count

小结

Semaphore做为信号量,用来控制线程的并发数量,CountDownLatch用来控制线程执行任务的时机也挺不错的。它们两个的理解和使用都比较简单,好了,又填了一个坑,下次继续挖坑和填坑hhh

相关文章
相关标签/搜索