Java 里如何实现线程间通讯?

总结:html

1.等待通知机制git

    两个线程经过对同一对象调用等待 wait() 和通知 notify() 方法来进行通信。github

2.join() 方法数据库

3.volatile 共享内存编程

4.CountDownLatch 并发工具数据结构

5.CyclicBarrier 并发工具多线程

6.线程响应中断:thread.interrupt(),thread.isInterrupted()并发

7.线程池 awaitTermination() 方法,isTerminated()方法异步

8.管道通讯 ?PipedWriter,PipedReaderide

9.Future

https://github.com/crossoverJie/Java-Interview/blob/master/MD/concurrent/thread-communication.md

 

一:notify()/notifyAll()/sleep()/wait()的区别 

notify():随机唤醒一个等待该对象同步锁的线程,进入就绪队列等待CPU的调度;这里的唤醒是由JVM肯定唤醒哪一个线程,并且不是按优先级决定。

notifyAll():唤醒全部的等待该对象同步锁的线程,进入就绪队列等待CPU调度;注意唤醒的是notify以前wait的线程,对于notify以后的wait线程是没有效果的。

wait():调用时须要先得到该Object的锁,调用后,会把当前的锁释放掉同时阻塞住;但能够经过调用该Object的notify()或者notifyAll()来从新得到锁。

sleep():在指定的时间内让正在执行的线程暂停执行,但不会释放锁。

区别主要体如今这几个方面?

  1.  咱们经过对这些方法分析,sleep()方法属于Thread类,而wait()/notify()/notifyAll()属于Object基础类,也就是说每一个对象都有wait()/notify()/notifyAll()的功能。

  2. sleep()不会释放锁,而wait()会释放锁。

  3. sleep()必须捕获异常,而wait()/notify()/notifyAll()不须要捕获异常。

  4. sleep()能够在任何地方使用,而wait()/notify()/notifyAll()只能在同步控制方法或者同步控制块里面使用。

二:如何实现多线程之间的通信和协做 

利用同步和互斥来解决多线程之间的通信和协做;能够说资源互斥、协调竞争是要解决的因,而同步是竞争协调的果。

  1. 经过synchronized/notify/notifyAll来实现线程之间的通讯。

  2. 利用了Java5中提供的Lock/Condition来实现线程之间的相互通讯。

  3. 使用信号量,如:CyclicBarrier/Semaphore/Countdownbatch。Phaser

怎么解决多线程计算的结果统计?

能够用join()以及Future/FutureTask或CompletableFuture来解决。join()的功能是使异步执行的线程变成同步执行;使用join()后,直到这个线程退出,程序才会往下执行。

 

1.Thread B等Thread A执行完再执行, 在Thread B中threadA.join() 方法

 

2.个线程按照指定方式有序交叉运行 能够利用 object.wait() 和 object.notify() 两个方法来实现。

 

3.四个线程 A B C D,其中 D 要等到 A B C 全执行完毕后才执行,并且 A B C 是同步运行的

利用 CountdownLatch 来实现这类通讯方式。其实简单点来讲,CountDownLatch 就是一个倒计数器。它的基本用法是:

  1. 建立一个计数器,设置初始值,CountdownLatch countDownLatch = new CountDownLatch(2);
  2. 在 等待线程 里调用 countDownLatch.await() 方法,进入等待状态,直到计数值变成 0;
  3. 在 其余线程 里,调用 countDownLatch.countDown() 方法,该方法会将计数值减少 1;
  4. 当 其余线程 的 countDown() 方法把计数值变成 0 时,等待线程 里的 countDownLatch.await()当即退出,继续执行下面的代码。

CountDownLatch 适用于一个线程去等待多个线程的状况。

 

4.三个运动员各自准备,等到三我的都准备好后,再一块儿跑

上面的 CountDownLatch 能够用来倒计数,但当计数完毕,只有一个线程的 await() 会获得响应,没法让多个线程同时触发。

为了实现线程间互相等待这种需求,咱们能够利用 CyclicBarrier 数据结构,它的基本用法是:

  1. 先建立一个公共 CyclicBarrier 对象,设置 同时等待 的线程数,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
  2. 这些线程同时开始本身作准备,自身准备完毕后,须要等待别人准备完毕,这时调用 cyclicBarrier.await(); 便可开始等待别人;
  3. 当指定的 同时等待 的线程数都调用了 cyclicBarrier.await();时,意味着这些线程都准备完毕好,而后这些线程才 同时继续执行

 

5.子线程完成某件任务后,把获得的结果回传给主线程

在 Java 里,有一个类是配合 Callable 使用的:FutureTask,不过注意,它获取结果的 get 方法会阻塞主线程。

主线程调用 futureTask.get() 方法时阻塞主线程;而后 Callable 内部开始执行,并返回运算结果;此时 futureTask.get() 获得结果,主线程恢复运行。

这里咱们能够学到,经过 FutureTask 和 Callable 能够直接在主线程得到子线程的运算结果,只不过须要阻塞主线程。固然,若是不但愿阻塞主线程,能够考虑利用 ExecutorService,把 FutureTask 放到线程池去管理执行。

 

6.CompletableFuture还有更高级的用法

https://my.oschina.net/u/3705388/blog/1602862

 

 

CountDownLatch:倒数N个数,经过调用countDown()倒数。当到0时,countDownLatch.await();线程被唤醒。只阻塞countDownLatch.await();这一个线程,用于等待调用countDown()倒数线程执行到位后再执行。

CyclicBarrier:可循环屏障。设置N个数,调用cyclicBarrier.await();的线程也至关于对N减一并阻塞,当第N个线程调用cyclicBarrier.await()后,全部调用cyclicBarrier.await()阻塞的线程被唤醒继续执行。在建立CyclicBarrier的时候能够传入一个Runnable,用于全部线程到达屏障后,便当即执行的线程,即此线程会与N个调用cyclicBarrier.await()的线程同时执行。另外CyclicBarrier可重置(调用reset()),重复使用。

Semaphore:信号量并发工具类,其提供了aquire()和release()方法来进行并发控制。设置一个N,aquire调用时根据并发数判断是否阻塞,至关于发送令牌,一共有N个令牌,aquire获取令牌,拿到令牌执行,没拿到阻塞,执行完归还令牌。通常用于资源限流,限量的工做场景。

 

Phaser:它把多个线程协做执行的任务划分为多个阶段,编程时须要明确各个阶段的任务,每一个阶段均可以有任意个参与者,线程均可以随时注册并参与到某个阶段。

构造

Phaser建立后,初始阶段编号为0,构造函数中指定初始参与个数。

注册:Registration

Phaser支持经过register()和bulkRegister(int parties)方法来动态调整注册任务的数量。

Arrival

每一个Phaser实例都会维护一个phase number,初始值为0。每当全部注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到达;其中arrive(),某个参与者完成任务后调用;arriveAndDeregister(),任务完成,取消本身的注册。arriveAndAwaitAdvance(),本身完成等待其余参与者完成,进入阻塞,直到Phaser成功进入下个阶段。

Phaser提供了比CountDownLatch、CyclicBarrier更增强大、灵活的功能,从某种程度上来讲,Phaser能够替换他们,例子见网址:

https://www.cnblogs.com/chenssy/p/4989515.html

 

 

在并发编程中常常会使用到一些并发工具类,来对线程的并发量、执行流程、资源依赖等进行控制。这里咱们主要探讨三个常用的并发工具类:CountDownLatch,CyclicBarrier和Semaphore。

一:CountDownLatch 

从CountDownLatch的字面意思就能够体现出其设计模型,countdown在英语里具备倒计时的(倒数)意思,Latch就是门闩的意思。CountDownLatch的构造函数接受一个int值做为计数器的初始值N,当程序调用countDown()的时候,N便会减1(体现出了倒数的意义),当N值减为0的时候,阻塞在await()的线程便会唤醒,继续执行。这里经过一个例子来讲明其应用场景。

假设咱们主线程须要建立5个工做线程来分别执行5个任务,主线程须要等待5个任务所有完成后才会进行后续操做,那么咱们就能够声明N=5的CountDownLatch,来进行控制。

代码以下:

public class CountDownLatchDemo {

    private static final CountDownLatch countDownLatch = new CountDownLatch(5);

    public static void main(String[] args) throws InterruptedException {

        //循环建立5个工做线程

        for( int ix = 0; ix != 5; ix++ ){

            new Thread(new Runnable() {

                public void run() {

                    try{

                        System.out.println( Thread.currentThread().getName() + " start" );

                        Thread.sleep(1000);

                        countDownLatch.countDown();

                        System.out.println( Thread.currentThread().getName() + " stop" );

                    } catch ( InterruptedException ex ){

                    }

                }

            }, "Task-Thread-" + ix ).start();

 

            Thread.sleep(500);

        }

        //主线程等待全部任务完成

        countDownLatch.await();

        System.out.println("All task has completed.");

    }

}

运行结果:

Task-Thread-0 start

Task-Thread-1 start

Task-Thread-0 stop

Task-Thread-2 start

Task-Thread-1 stop

Task-Thread-3 start

Task-Thread-2 stop

Task-Thread-4 start

Task-Thread-3 stop

Task-Thread-4 stop

All task has completed.

在主线程建立了5个工做线程后,就会阻塞在countDownLatch.await(),等待5个工做线程所有完成任务后返回。任务的执行顺序可能会不一样,可是任务完成的Log必定会在最后显示。CountDownLatch经过计数器值的控制,实现了容许一个或多个线程等待其余线程完成操做的并发控制。

二:CyclicBarrier 

CyclicBarrier就字面意思是可循环的屏障,其体现了两个特色,可循环和屏障。调用CyclicBarrier的await()方法即是在运行线程中插入了屏障,当线程运行到这个屏障时,便会阻塞在await()方法中,直到等待全部线程运行到屏障后,才会返回。CyclicBarrier的构造函数一样接受一个int类型的参数,表示屏障拦截线程的数目。另外一个特色循环即是体现处出了CyclicBarrier与CountDownLatch不一样之处了,CyclicBarrier能够经过reset()方法,将N值重置,循环使用,而CountDownLatch的计数器是不能重置的。此外,CyclicBarrier还提供了一个更高级的用法,容许咱们设置一个全部线程到达屏障后,便当即执行的Runnable类型的barrierAction(注意:barrierAction不会等待await()方法的返回才执行,是当即执行!)机会,这里咱们经过如下代码来测试一下CyclicBarrier的特性。

代码以下:

public class CyclicBarrierDemo {

    private final static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new MyBarrierAction());

    private final static AtomicInteger atcIx = new AtomicInteger(1);

    public static void main(String[] args) {

        for (int ix = 0; ix != 10; ix++){

            new Thread(new Runnable() {

                public void run() {

                    try{

                        System.out.println(Thread.currentThread().getName() + " start");

                        Thread.sleep(atcIx.getAndIncrement() * 1000 );

                        cyclicBarrier.await();

                        System.out.println( Thread.currentThread().getName() + " stop" );

                    } catch ( Exception ex){

                    }

                }

            }, "Thread-" + ix).start();

        }

    }

 

    private static class MyBarrierAction implements Runnable {

        @Override

        public void run() {

            System.out.println("MyBarrierAction is call.");

        }

    }

}

运行结果:

Thread-0 start

Thread-1 start

Thread-2 start

Thread-3 start

Thread-4 start

MyBarrierAction is call.

Thread-4 stop

Thread-0 stop

Thread-1 stop

Thread-2 stop

Thread-3 stop

根据运行结果,咱们能够看到一下几点:

  1. 首先在线程没有调用够N次cyclicBarrier.await()时,全部线程都会阻塞在cyclicBarrier.await()上,也就是说必须N个线程同时到达屏障,才会全部线程越过屏障继续执行。

  2. 验证了BarrierAction的执行时机是全部阻塞线程都到达屏障以后,而且BarrierAction执行后,全部线程才会从await()方法返回,继续执行。

三:Semaphore 

Semaphore信号量并发工具类,其提供了aquire()和release()方法来进行并发控制。Semaphore通常用于资源限流,限量的工做场景,例如数据库链接控制。假设数据库的最大负载在10个链接,而如今有100个客户端想进行数据查询,显然咱们不能让100个客户端同时链接上来,找出数据库服务的崩溃。那么咱们能够建立10张令牌,想要链接数据库的客户端,都必须先尝试获取令牌(Semaphore.aquire()),当客户端获取到令牌后即可以进行数据库链接,并在完成数据查询后归还令牌(Semaphore.release()),这样就能保证同时链接数据库的客户端不超过10个,由于只有10张令牌,这里给出该场景的模拟代码。

代码以下:

public class SemaphoreDemo {

    private static final Semaphore semaphoreToken = new Semaphore(10);

    public static void main(String[] args) {

        for (int ix = 0; ix != 100; ix++) {

            new Thread(new Runnable() {

                public void run() {

                    try {

                        semaphoreToken.acquire();

                        System.out.println("select * from xxx");

                        semaphoreToken.release();

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            }).start();

        }

    }

}

也许有同窗会问,aquire()函数获取许可证的顺序和调用的前后顺序有关系吗,也就是说该例子中客户端是不是排队获取令牌的?答案不是,由于Semaphore默认是非公平的,固然其构造函数提供了设置为公平信号量的参数。

 

 

多线程之间须要等待协调,才能完成某种工做,问怎么设计这种协调方案?如:子线程循环10次,接着主线程循环100,接着又回到子线程循环10次,接着再回到主线程又循环100,如此循环50次。

public class Question {

    public static void main(String[] args) throws InterruptedException {

        final Object object = new Object();

        new Thread(new Runnable() {

 

            public void run() {

                for (int i = 0; i < 50; i++) {

                    synchronized (object) {

                        for (int j = 0; j < 10; j++) {

                            System.out.println("SubThread:" + (j + 1));

                        }

                        object.notify();

                        try {

                            object.wait();

                        } catch (InterruptedException e) {

                            e.printStackTrace();

                        }

                    }

                }

            }

        }).start();

 

        for (int i = 0; i < 50; i++) {

            synchronized (object) {

                //主线程让出锁,等待子线程唤醒

                object.wait();

                for (int j = 0; j < 100; j++) {

                    System.out.println("MainThread:" + (j + 1));

                }

                object.notify();

            }

        }

    }

}

 

 

https://mp.weixin.qq.com/s?__biz=MzIzMzgxOTQ5NA==&mid=2247483697&idx=1&sn=29715bb12acb9b123284c60009fd5e99&chksm=e8fe9d38df89142e5d7e58c6288b76cc9c62581b5bce5ff6220ad5f47e364ccf197d124f57cb&scene=21#wechat_redirect

相关文章
相关标签/搜索