高并发编程学习(2)——线程通讯详解

为得到良好的阅读体验,请访问原文: 传送门java

前序文章git

- 高并发编程学习(1)——并发基础 - www.wmyskxz.com/2019/11/26/…github

1、经典的生产者消费者案例

上一篇文章咱们提到一个应用能够建立多个线程去执行不一样的任务,若是这些任务之间有着某种关系,那么线程之间必须可以通讯来协调完成工做。面试

生产者消费者问题(英语:Producer-consumer problem)就是典型的多线程同步案例,它也被称为有限缓冲问题(英语:Bounded-buffer problem)。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要做用是生成必定量的数据放到缓冲区中,而后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。(摘自维基百科:生产者消费者问题)编程

  • 注意: 生产者-消费者模式中的内存缓存区的主要功能是数据在多线程间的共享,此外,经过该缓冲区,能够缓解生产者和消费者的性能差;

准备基础代码:无通讯的生产者消费者

咱们来本身编写一个例子:一个生产者,一个消费者,而且让他们让他们使用同一个共享资源,而且咱们指望的是生产者生产一条放到共享资源中,消费者就会对应地消费一条。api

咱们先来模拟一个简单的共享资源对象:缓存

public class ShareResource {

    private String name;
    private String gender;

    /**
     * 模拟生产者向共享资源对象中存储数据
     *
     * @param name
     * @param gender
     */
    public void push(String name, String gender) {
        this.name = name;
        this.gender = gender;
    }

    /**
     * 模拟消费者从共享资源中取出数据
     */
    public void popup() {
        System.out.println(this.name + "-" + this.gender);
    }
}复制代码

而后来编写咱们的生产者,使用循环来交替地向共享资源中添加不一样的数据:安全

public class Producer implements Runnable {

    private ShareResource shareResource;

    public Producer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            if (i % 2 == 0) {
                shareResource.push("凤姐", "女");
            } else {
                shareResource.push("张三", "男");
            }
        }
    }
}复制代码

接着让咱们的消费者不停地消费生产者产生的数据:微信

public class Consumer implements Runnable {

    private ShareResource shareResource;

    public Consumer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            shareResource.popup();
        }
    }
}复制代码

而后咱们写一段测试代码,来看看效果:session

public static void main(String[] args) {
    // 建立生产者和消费者的共享资源对象
    ShareResource shareResource = new ShareResource();
    // 启动生产者线程
    new Thread(new Producer(shareResource)).start();
    // 启动消费者线程
    new Thread(new Consumer(shareResource)).start();
}复制代码

咱们运行发现出现了诡异的现象,全部的生产者都彷佛消费到了同一条数据:

张三-男
张三-男
....如下全是张三-男....复制代码

为何会出现这样的状况呢?照理说,个人生产者在交替地向共享资源中生产数据,消费者也应该交替消费才对呀..咱们大胆猜想一下,会不会是由于消费者是直接循环了 30 次打印共享资源中的数据,而此时生产者尚未来得及更新共享资源中的数据,消费者就已经连续打印了 30 次了,因此咱们让消费者消费的时候以及生产者生产的时候都小睡个 10 ms 来缓解消费太快 or 生产太快带来的影响,也让现象更明显一些:

/**
 * 模拟生产者向共享资源对象中存储数据
 *
 * @param name
 * @param gender
 */
public void push(String name, String gender) {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    this.name = name;
    this.gender = gender;
}

/**
 * 模拟消费者从共享资源中取出数据
 */
public void popup() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    System.out.println(this.name + "-" + this.gender);
}复制代码

再次运行代码,发现了出现了如下的几种状况:

  • 重复消费:消费者连续地出现两次相同的消费状况(张三-男/ 张三-男);
  • 性别紊乱:消费者消费到了脏数据(张三-女/ 凤姐-男);

分析出现问题的缘由

  • 重复消费:咱们先来看看重复消费的问题,当生产者生产出一条数据的时候,消费者正确地消费了一条,可是当消费者再来共享资源中消费的时候,生产者尚未准备好新的一条数据,因此消费者就又消费到老数据了,这其中的根本缘由是生产者和消费者的速率不一致
  • 性别紊乱:再来分析第二种状况。不一样于上面的状况,消费者在消费第二条数据时,生产者也正在生产新的数据,可是尴尬的是,生产者只生产了一半儿(也就是该执行完 this.name = name),也就是尚未来得及给 gender 赋值就被消费者给取走消费了.. 形成这样状况的根本缘由是没有保证生产者生产数据的原子性

解决出现的问题

加锁解决性别紊乱

咱们先来解决性别紊乱,也就是原子性的问题吧,上一篇文章里咱们也提到了,对于这样的原子性操做,解决方法也很简单:加锁。稍微改造一下就行了:

/**
 * 模拟生产者向共享资源对象中存储数据
 *
 * @param name
 * @param gender
 */
synchronized public void push(String name, String gender) {
    this.name = name;
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    this.gender = gender;
}

/**
 * 模拟消费者从共享资源中取出数据
 */
synchronized public void popup() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    System.out.println(this.name + "-" + this.gender);
}复制代码

  • 咱们在方法前面都加上了 synchronized 关键字,来保证每一次读取和修改都只能是一个线程,这是由于当 synchronized 修饰在普通同步方法上时,它会自动锁住当前实例对象,也就是说这样改造以后读/ 写操做同时只能进行其一;
  • 我把 push 方法小睡的代码改在了赋值 namegender 的中间,以强化验证原子性操做是否成功,由于若是不是原子性的话,就极可能出现赋值 name 还没赋值给 gender 就被取走的状况,小睡一下子是为了增强这种状况的出现几率(能够试着把 synchronized 去掉看看效果);

运行代码后发现,并无出现性别紊乱的现象了,可是重复消费仍然存在。

等待唤醒机制解决重复消费

咱们指望的是 张三-男凤姐-女 交替出现,而不是有重复消费的状况,因此咱们的生产者和消费者之间须要一点沟通,最容易想到的解决方法是,咱们新增长一个标志位,而后在消费者中使用 while 循环判断,不知足条件则不消费,条件知足则退出 while 循环,从而完成消费者的工做。

while (value != desire) {
    Thread.sleep(10);
}
doSomething();复制代码

这样作的目的就是为了防止「过快的无效尝试」,这种方法看似可以实现所需的功能,可是却存在以下的问题:

  • 1)难以确保及时性。在睡眠时,基本不消耗处理器的资源,可是若是睡得太久,就不能及时发现条件已经变化,也就是及时性难以保证;
  • 2)难以下降开销。若是下降睡眠的时间,好比休眠 1 毫秒,这样消费者可以更加迅速地发现条件变化,可是却可能消耗更多的处理资源,形成了无故的浪费。

以上两个问题吗,看似矛盾难以调和,可是 Java 经过内置的等待/ 通知机制可以很好地解决这个矛盾并实现所需的功能。

等待/ 通知机制,是指一个线程 A 调用了对象 O 的 wait() 方法进入等待状态,而另外一个线程 B 调用了对象 O 的 notifyAll() 方法,线程 A 收到通知后从对象 O 的 wait() 方法返回,进而执行后续操做。上述两个线程都是经过对象 O 来完成交互的,而对象上的 waitnotify/ notifyAll 的关系就如同开关信号同样,用来完成等待方和通知方之间的交互工做。

这里有一个比较奇怪的点是,为何看起来像是线程之间操做的 waitnotify/ notifyAll 方法会是 Object 类中的方法,而不是 Thread 类中的方法呢?

- 简单来讲:由于 synchronized 中的这把锁能够是任意对象,由于要知足任意对象都可以调用,因此属于 Object 类;

- 专业点说:由于这些方法在操做同步线程时,都必需要标识它们操做线程的锁,只有同一个锁上的被等待线程,能够被同一个锁上的 notify 唤醒,不能够对不一样锁中的线程进行唤醒。也就是说,等待和唤醒必须是同一个锁。而锁能够是任意对象,因此能够被任意对象调用的方法是定义在 Object 类中。

好,简单介绍完等待/ 通知机制,咱们开始改造吧:

public class ShareResource {

    private String name;
    private String gender;
    // 新增长一个标志位,表示共享资源是否为空,默认为 true
    private boolean isEmpty = true;

    /**
     * 模拟生产者向共享资源对象中存储数据
     *
     * @param name
     * @param gender
     */
    synchronized public void push(String name, String gender) {
        try {
            while (!isEmpty) {
                // 当前共享资源不为空的时,则等待消费者来消费
                // 使用同步锁对象来调用,表示当前线程释放同步锁,进入等待池,只能被其余线程所唤醒
                this.wait();
            }
            // 开始生产
            this.name = name;
            Thread.sleep(10);
            this.gender = gender;
            // 生产结束
            isEmpty = false;
            // 生产结束唤醒一个消费者来消费
            this.notify();
        } catch (Exception ignored) {
        }
    }

    /**
     * 模拟消费者从共享资源中取出数据
     */
    synchronized public void popup() {
        try {
            while (isEmpty) {
                // 为空则等着生产者进行生产
                // 使用同步锁对象来调用,表示当前线程释放同步锁,进入等待池,只能被其余线程所唤醒
                this.wait();
            }
            // 消费开始
            Thread.sleep(10);
            System.out.println(this.name + "-" + this.gender);
            // 消费结束
            isEmpty = true;
            // 消费结束唤醒一个生产者去生产
            this.notify();
        } catch (InterruptedException ignored) {
        }
    }
}复制代码

  • 咱们指望生产者生产一条,而后就去通知消费者消费一条,那么在生产和消费以前,都须要考虑当前是否须要生产 or 消费,因此咱们新增了一个标志位来判断,若是不知足则等待;
  • 被通知后仍然要检查条件,条件知足,则执行咱们相应的生产 or 消费的逻辑,而后改变条件(这里是 isEmpty),而且通知全部等待在对象上的线程;
  • 注意:上面的代码中通知使用的 notify() 方法,这是由于例子中写死了只有一个消费者和生产者,在实际状况中建议仍是使用 notifyAll() 方法,这样多个消费和生产者逻辑也可以保证(能够本身试一下);

小结

经过初始版本一步步地分析问题和解决问题,咱们就差很少写出了咱们经典生产者消费者的经典代码,但一般消费和生产的逻辑是写在各自的消费者和生产者代码里的,这里我为了方便阅读,把他们都抽离到了共享资源上,咱们能够简单地再来回顾一下这个消费生产和等待通知的整个过程:

以上就是关于生产者生产一条数据,消费者消费一次的过程了,涉及的一些具体细节咱们下面来讲。

2、线程间的通讯方式

等待唤醒机制的替代:Lock 和 Condition

咱们从上面的中看到了 wait()notify() 方法,只能被同步监听锁对象来调用,不然就会报出 IllegalMonitorZStateException 的异常,那么如今问题来了,咱们在上一篇提到的 Lock 机制根本就没有同步锁了,也就是没有自动获取锁和自动释放锁的概念,由于没有同步锁,也就意味着 Lock 机制不能调用 waitnotify 方法,咱们怎么办呢?

好在 Java 5 中提供了 Lock 机制的同时也提供了用于 Lock 机制控制通讯的 Condition 接口,若是你们理解了上面说到的 Object.wait()Object.notify() 方法的话,那么就能很容易地理解 Condition 对象了。

它和 wait()notify() 方法的做用是大体相同的,只不事后者是配合 synchronized 关键字使用的,而 Condition 是与重入锁相关联的。经过 Lock 接口(重入锁就实现了这一接口)的 newCondition() 方法能够生成一个与当前重入锁绑定的 Condition 实例。利用 Condition 对象,咱们就可让线程在合适的时间等待,或者在某一个特定的时刻获得通知,继续执行。

咱们拿上面的生产者消费者来举例,修改为 Lock 和 Condition 代码以下:

public class ShareResource {

    private String name;
    private String gender;
    // 新增长一个标志位,表示共享资源是否为空,默认为 true
    private boolean isEmpty = true;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    /**
     * 模拟生产者向共享资源对象中存储数据
     *
     * @param name
     * @param gender
     */
    public void push(String name, String gender) {
        lock.lock();
        try {
            while (!isEmpty) {
                // 当前共享资源不为空的时,则等待消费者来消费
                condition.await();
            }
            // 开始生产
            this.name = name;
            Thread.sleep(10);
            this.gender = gender;
            // 生产结束
            isEmpty = false;
            // 生产结束唤醒消费者来消费
            condition.signalAll();
        } catch (Exception ignored) {
        } finally {
            lock.unlock();
        }
    }

    /**
     * 模拟消费者从共享资源中取出数据
     */
    public void popup() {
        lock.lock();
        try {
            while (isEmpty) {
                // 为空则等着生产者进行生产
                condition.await();
            }
            // 消费开始
            Thread.sleep(10);
            System.out.println(this.name + "-" + this.gender);
            // 消费结束
            isEmpty = true;
            // 消费结束唤醒生产者去生产
            condition.signalAll();
        } catch (InterruptedException ignored) {
        } finally {
            lock.unlock();
        }
    }
}复制代码

在 JDK 内部,重入锁和 Condition 对象被普遍地使用,以 ArrayBlockingQueue 为例,它的 put() 方法实现以下:

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

// 构造函数,初始化锁以及对应的 Condition 对象
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 等待队列有足够的空间
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 通知须要 take() 的线程,队列已有数据
    notEmpty.signal();
}复制代码

同理,对应的 take() 方法实现以下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 若是队列为空,则消费者队列要等待一个非空的信号
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}复制代码

容许多个线程同时访问:信号量(Semaphore)

如下内容摘录 or 改编自 《实战 Java 高并发程序设计》 3.1.3 节的内容

信号量为多线程协做提供了更为强大的控制方法。广义上说,信号量是对锁的扩展,不管是内部锁 synchronized 仍是重入锁 ReentrantLock,一次都只容许一个线程访问一个资源,而信号量却能够指定多个线程,同时访问某一个资源。信号量主要提供了如下构造函数:

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)        // 第二个参数能够指定是否公平复制代码

在构造信号量对象时,必需要指定信号量的准入数,即同时能申请多少个许可。当每一个线程每次只申请一个许可时,这就至关于指定了同时有多少个线程能够访问某一个资源。信号量的主要逻辑以下:

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()复制代码

  • acquire() 方法尝试得到一个准入的许可。若没法得到,则线程会等待,直到有线程释放一个许可或者当前线程被中断。
  • acquireUninterruptibly() 方法和 acquire() 方法相似,可是不响应中断。
  • tryAcquire() 尝试得到一个许可,若是成功则返回 true,失败则返回 false,它不会进行等待,当即返回。
  • release() 用于在线程访问资源结束后,释放一个许可,以使其余等待许可的线程能够进行资源访问。

在 JDK 的官方 Javadoc 中,就有一个有关信号量使用的简单实例,有兴趣的读者能够自行去翻阅一下,这里给出一个更傻瓜化的例子:

public class SemapDemo implements Runnable {

    final Semaphore semaphore = new Semaphore(5);

    @Override
    public void run() {
        try {
            semaphore.acquire();
            // 模拟耗时操做
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
            semaphore.release();
        } catch (InterruptedException ignore) {
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            executorService.submit(demo);
        }
    }
}复制代码

执行程序,就会发现系统以 5 个线程为单位,依次输出带有线程 ID 的提示文本。

在实现上,Semaphore 借助了线程同步框架 AQS(AbstractQueuedSynchornizer),一样借助了 AQS 来实现的是 Java 中可重入锁的实现。AQS 的强大之处在于,你仅仅须要继承它,而后使用它提供的 api 就能够实现任意复杂的线程同步方案,AQS 为咱们作了大部分的同步工做,因此这里不细说,以后再来详细探究一下...

我等着你:Thread.join()

若是一个线程 A 执行了 thread.join() 方法,其含义是:当前线程 A 等待 thread 线程终止以后才从 thread.join() 返回。线程 Thread 除了提供 join() 方法以外,还提供了 join(long millis)join(long millis, int nanos) 两个具有超时特性的方法。这两个超时方法表示,若是线程 Thread 在给定的超时时间里没有终止,那么将会从该超时方法中返回。

在下面的代码中,咱们建立了 10 个线程,编号 0 ~ 9,每一个线程调用前一个线程的 join() 方法,也就是线程 0 结束了,线程 1 才能从 join() 方法中返回,而线程 0 须要等待 main 线程结束。

public class Join {

    public static void main(String[] args) throws InterruptedException {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每一个线程拥有前一个线程的引用,须要等待前一个线程终止,才能从等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate. ");
    }

    static class Domino implements Runnable {

        private Thread thread;

        public Domino(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException ignore) {
            }
            System.out.println(Thread.currentThread().getName() + " terminate. ");
        }
    }
}复制代码

运行程序,能够看到下列输出:

main terminate. 
0 terminate. 
1 terminate. 
2 terminate. 
3 terminate. 
4 terminate. 
5 terminate. 
6 terminate. 
7 terminate. 
8 terminate. 
9 terminate. 复制代码

说明每一个线程终止的前提都是前驱线程的终止,每一个线程等待前驱线程结束后,才从 join() 方法中返回,这里涉及了等待/ 通知机制,在 JDK 的源码中,咱们能够看到 join() 的方法以下:

public final synchronized void join(long millis)
    throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        // 条件不知足则继续等待
        while (isAlive()) {
            wait(0);
        }
        // 条件符合则返回
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}复制代码

当线程终止时,会调用线程自身的 notifyAll() 方法,会通知全部等待在该线程对象上的线程。能够看到 join() 方法的逻辑结构跟咱们上面写的生产者消费者相似,即加锁、循环和处理逻辑三个步骤。

3、线程之间的数据交互

保证可见性:volatile 关键字

咱们先从一个有趣的例子入手:

private static boolean isOver = false;

public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(() -> {
        while (!isOver) {
        }
        System.out.println("线程已感知到 isOver 置为 true,线程正常返回!");
    });
    thread.start();
    Thread.sleep(500);
    isOver = true;
    System.out.println("isOver 已置为 true");
}复制代码

咱们开启了一个主线程和一个子线程,咱们指望子线程可以感知到 isOver 变量的变化以结束掉死循环正常返回,可是运行程序却发现并非像咱们指望的那样发生,子线程一直处在了死循环的状态!

为何会这样呢?

Java 内存模型

关于这一点,咱们有几点须要说明,首先须要搞懂 Java 的内存模型:

Java 虚拟机规范中试图定义一种 Java 内存模型(Java Memory Model, JMM)来屏蔽掉各层硬件和操做系统的内存访问差别,以实现让 Java 程序在各类平台下都能达到一致的内存访问效果。

Java 内存模型规定了全部的变量都存储在主内存(Main Memory)中。每条线程还有本身的工做内存(Working Memory),线程的工做内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的全部操做(读取、赋值等)都必须在主内存中进行,而不能直接读写主内存中的变量。不一样的线程之间也没法直接访问对方工做内存中的变量,线程间的变量值的传递均须要经过主内存来完成,线程、主内存、工做内存三者的关系如上图。

那么不一样的线程之间是如何通讯的呢?

共享内存的并发模型里,线程之间共享程序的公共状态,线程之间经过写-读内存中的公共状态来隐式进行通讯,典型的共享内存通讯方式就是经过共享对象进行通讯。

例如上图线程 A 与 线程 B 之间若是要通讯的话,那么就必须经历下面两个步骤:

  1. 首先,线程 A 把本地内存 A 更新过的共享变量刷新到主内存中去
  2. 而后,线程 B 到主内存中去读取线程 A 以前更新过的共享变量

在消息传递的并发模型里,线程之间没有公共状态,线程之间必须经过明确的发送消息来显式进行通讯,在 Java 中典型的消息传递方式就是 wait()notify()

说回刚才出现的问题,就很容易理解了:每一个线程都有独占的内存区域,如操做栈、本地变量表等。线程本地保存了引用变量在堆内存中的副本,线程对变量的全部操做都在本地内存区域中进行,执行结束后再同步到堆内存中去。也就是说,咱们在主线程中修改的 isOver 的值并无被子线程读取到(没有被刷入主内存),也就形成了子线程对于 isOver 变量不可见。

解决方法也很简单,只须要在 isOver 变量前加入 volatile 关键字就能够了,这是由于加入了 volatile 修饰的变量容许直接与主内存交互,进行读写操做,保证可见性。

指令重排/ happen-before 原则

再从另外一个有趣的例子中入手,这是在高并发场景下会存在的问题:

class LazyInitDemo {
    private static TransationService service = null;
    
    public static TransationService getTransationService(){
        if (service == null) {
            synchronized (this) {
                if (service == null) {
                    service = new TransationService();
                }
            }
        }
    }
}复制代码

这是一个典型的双重检查锁定思想,这段代码也是一个典型的双重检查锁定(Double-checked Locking)问题。在高并发的状况下,该对象引用在没有同步的状况下进行读写操做,致使用户可能会获取未构造完成的对象

这是由于指令优化的结果。计算机不会根据代码顺序循序渐进地执行相关指令,咱们来举一个借书的例子:假如你要去还书而且想要借一个《高并发编程学习》系列丛书,而你的室友刚好也要还书,而且还想让你帮忙借一本《Java 从入门到放弃》。

这件事乍一看有两件事:你的事和你室友的事。先办完你的事,再开始处理你室友的事情是属于单线程的死板行为,此时你会潜意识地进行「优化」,例如你能够把你要还的书和你室友须要还的书一块儿还了,再一块儿把想要借的书借出来,这其实就至关于合并数据进行存取的操做过程了。

咱们知道一条指令的执行是能够分红不少步骤的,简单地说,能够分为:

  • 取值 IF
  • 译码和去寄存器操做数 ID
  • 执行或者有效地址计算 EX
  • 存储器访问 MEM
  • 写回 WB

因为每个步骤可能使用不一样的硬件完成,所以,聪明的工程师就发明了流水线技术来执行指令,以下图所示:

能够看到,当第 2 条指令执行时,第 1 条执行其实并无执行完,确切地说第一条指令尚未开始执行,只是刚刚完成了取值操做而已。这样的好处很是明显,假如这里每个步骤都须要花费 1 毫秒,那么指令 2 等待指令 1 彻底执行后再执行,则须要等待 5 毫秒,而使用流水线指令,指令 2 只须要等待 1 毫秒就能够执行了。如此大的性能提高,固然让人眼红。

回到最初的问题,咱们分析一下:对于 Java 编译器来讲,初始化 TransactionService 实例和将对象地址写到 service 字段并不是原子操做,且这两个阶段的执行顺序是未定义的。加入某个线程执行 new TransactionService() 时,构造方法还未被调用,编译器仅仅为该对象分配了内存空间并设为默认值,此时若另外一个线程调用 getTransactionService() 方法,因为 service != null,可是此时 service 对象尚未被赋予真正的有效值,从而没法取到正确的 service 单例对象。

对于此问题,一种较为简单的解决方案就是用 volatile 关键字修饰目标属性(适用于 JDK5 及以上版本),这样 service 就限制了编译器对它的相关读写操做,对它的读写操做进行指令重排,肯定对象实例化以后才返回引用。

另外指令重排也有本身的规则,并不是全部的指令均可以随意改变执行位置,下面列举一下基本的原则:

  • 程序次序规则:一个线程内,按照代码顺序,书写在前面的操做先行发生于书写在后面的操做;
  • 锁定规则:一个 unLock 操做先行发生于后面对同一个锁的 lock 操做;
  • volatile 变量规则:对一个变量的写操做先行发生于后面对这个变量的读操做;
  • 传递规则:若是操做 A 先行发生于操做 B,而操做 B 又先行发生于操做 C,则能够得出操做 A 先行发生于操做 C;
  • 线程启动规则:Thread 对象的 start() 方法先行发生于此线程的每一个一个动做;
  • 线程中断规则:对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生;
  • 线程终结规则:线程中全部的操做都先行发生于线程的终止检测,咱们能够经过 Thread.join() 方法结束、Thread.isAlive() 的返回值手段检测到线程已经终止执行;
  • 对象终结规则:一个对象的初始化完成先行发生于他的 finalize() 方法的开始;

volatile 不保证原子性

volatile 解决的是多线程共享变量的可见性问题,相似于 synchronized,但不具有 synchronized 的互斥性。因此对 volatile 变量的操做并不是都具备原子性,例如咱们用下面的例子来讲明:

public class VolatileNotAtomic {

    private static volatile long count = 0L;
    private static final int NUMBER = 10000;

    public static void main(String[] args) {
        Thread subtractThread = new SubstractThread();
        subtractThread.start();

        for (int i = 0; i < NUMBER; i++) {
            count++;
        }

        // 等待减法线程结束
        while (subtractThread.isAlive()) {
        }

        System.out.println("count 最后的值为: " + count);
    }

    private static class SubstractThread extends Thread {

        @Override
        public void run() {
            for (int i = 0; i < NUMBER; i++) {
                count--;
            }
        }
    }
}复制代码

屡次执行后,发现结果基本都不为 0。只有在 count++count-- 两处都进行加锁时,才能正确的返回 0,了解 Java 的童鞋都应该知道这 count++count-- 都不是一个原子操做,这里就不做说明了。

volatile 的使用优化

在了解一点吧,注明的并发编程大师 Doug lea 在 JDK 7 的并发包里新增一个队列集合类 LinkedTransferQueue,它在使用 volatile 变量时,用一种追加字节的方式来优化对列出队和入队的性能,具体的能够看一下下列的连接,这里就不具体说明了。

保证原子性:synchronized

Java 中任何一个对象都有一个惟一与之关联的锁,这样的锁做为该对象的一系列标志位存储在对象信息的头部。Java 对象头里的 Mark Word 里默认的存放的对象的 Hashcode/ 分代年龄和锁标记位。32 为JVM Mark Word 默认存储结构以下:

Java SE 1.6中,锁一共有 4 种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几个状态会随着竞争状况逐渐升级。锁能够升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁。这种锁升级却不能降级的策略,目的是为了提升得到锁和释放锁的效率。

偏向锁

HotSpot 的做者通过研究发现,大多数状况下,锁不只不存在多线程竞争,并且老是由同一线程屡次得到,为了让线程得到锁的代价更低而引入了偏向锁。

  • 偏向锁的获取:当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程 ID,之后该线程在进入和退出同步块时不须要进行 CAS 操做来加锁和解锁,只需简单地测试一下对象头的 Mark Word 里是否存储着指向当前线程的偏向锁。若是测试成功,表示线程已经得到了锁。若是测试失败,则须要再测试一下 Mark Word 中偏向锁的标识是否设置成 1(表示当前是偏向锁),若是没有设置,则使用CAS竞争锁;若是设置了,则尝试使用CAS将对象头的偏向锁指向当前线程。
  • 偏向锁的撤销:偏向锁使用了一种等到竞争出现才释放锁的机制,因此当其余线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。

下图线程 1 展现了偏向锁获取的过程,线程 2 展现了偏向锁撤销的过程。

轻量级锁和自旋锁

若是偏向锁失败,虚拟机并不会当即挂起线程。它还会使用一种称为轻量级锁的优化手段。

线程在执行同步块以前,JVM 会先在当前线程的栈桢中建立用于存储锁记录的空间,并将对象头中的 Mark Word 复制到锁记录中,官方称为 Displaced Mark Word。而后线程尝试使用 CAS 将对象头中的 Mark Word 替换为指向锁记录的指针。若是成功,当前线程得到锁,若是失败,表示其余线程竞争锁,当前线程便尝试使用自旋(本身执行几个空循环再进行尝试)来获取锁。

轻量级解锁时,会使用原子的 CAS 操做将 Displaced Mark Word 替换回到对象头,若是成功,则表示没有竞争发生。若是失败,表示当前锁存在竞争,锁就会膨胀成重量级锁。下图是两个线程同时争夺锁,致使锁膨胀的流程图。

几种锁的比较

下图就简单归纳了一下几种锁的比较:

每人一支笔:ThreadLocal

除了控制资源的访问外,咱们还能够经过增长资源来保证全部对象的线程安全。好比,让 100 我的填写我的信息表,若是只有一支笔,那么你们就得挨个写,对于管理人员来讲,必须保证你们不会去哄抢这仅存的一支笔,不然,谁也填不完。从另一个角度出发,咱们能够干脆就准备 100 支笔,那么全部人均可以各自为营,很快就能完成表格的填写工做。

若是说锁是使用第一种思路,那么 ThreadLocal 就是使用第二种思路了。

当使用 ThreadLocal 维护变量时,其为每一个使用该变量的线程提供独立的变量副本,因此每个线程均可以独立的改变本身的副本,而不会影响其余线程对应的副本。

ThreadLocal 内部实现机制

  1. 每一个线程内部都会维护一个相似 HashMap 的对象,称为 ThreadLocalMap,里边会包含若干了 Entry(K-V 键值对),相应的线程被称为这些 Entry 的属主线程;
  2. Entry 的 Key 是一个 ThreadLocal 实例,Value 是一个线程特有对象。Entry 的做用便是:为其属主线程创建起一个 ThreadLocal 实例与一个线程特有对象之间的对应关系;
  3. Entry 对 Key 的引用是弱引用;Entry 对 Value 的引用是强引用。

ThreadLodal 的反作用

为了让线程安全地共享某个变量,JDK 开出了 ThreadLocal 这副药方,但「是药三分毒」,ThreadLocal 也有必定的反作用。主要问题是「产生脏数据」和「内存泄漏」。这两个问题一般是在线程池中使用 ThreadLocal 引起的,由于线程池有 「线程复用」「内存常驻」 两个特色。

脏数据

线程复用会产生脏数据。因为线程池会重用 Thread 对象,那么与 Thread 绑定的类的静态属性 ThreadLocal 变量也会被重用。若是在实现的线程 run() 方法中不显式地 remove() 清理与线程相关的 ThreadLocal 信息,那么假若下一个线程不调用 set() 设置初始值,就可能 get() 到重用的线程信息,包括 ThreadLocal 所关联的线程对象的 value 值。

为了方便理解,用一段简要代码来模拟,以下所示:

public class DirtyDataInThreadLocal {

    public static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        // 使用固定大小为 1 的线程池,说明上一个的线程属性会被下一个线程属性复用
        ExecutorService pool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 2; i++) {
            Mythread mythread = new Mythread();
            pool.execute(mythread);
        }
    }

    private static class Mythread extends Thread {

        private static boolean flag = true;

        @Override
        public void run() {
            if (flag) {
                // 第 1 个线程 set 后,并无进行 remove
                // 而第二个线程因为某种缘由没有进行 set 操做
                threadLocal.set(this.getName() + ", session info.");
                flag = false;
            }
            System.out.println(this.getName() + " 线程是 " + threadLocal.get());
        }
    }
}复制代码

执行结果:

Thread-0 线程是 Thread-0, session info.
Thread-1 线程是 Thread-0, session info.复制代码

内存泄漏

在源码注释中提示使用 static 关键字来修饰 ThreadLocal。在此场景下,寄但愿于 ThreadLocal 对象失去引用后,触发弱引用机制来回收 Entry 的 Value 就变得不现实了。在上面的例子中,若是不进行 remove() 操做,那么这个线程执行完成后,经过 ThreadLocal 对象持有的 String 对象是不会被释放的。

以上两个问题的解决办法很简单,就是在每次使用完 ThreadLocal 时,必需要及时调用 remove() 方法清理。

参考资料

  1. 《Java 零基础入门教程》 - study.163.com/course/cour…
  2. 《Java 并发编程的艺术》
  3. 《码出高效 Java 开发手册》 - 杨冠宝(孤尽) 高海慧(鸣莎)著
  4. Java面试知识点解析(二)——高并发编程篇 - www.wmyskxz.com/2018/05/10/…
  5. 让你完全理解Synchronized - www.jianshu.com/p/d53bf830f…
  6. 《Offer来了 - Java面试核心知识点精讲》 - 王磊 编著
  7. 《实战Java高并发程序设计》 - 葛一鸣 郭超 编著

---

按照惯例黏一个尾巴:

欢迎转载,转载请注明出处!

独立域名博客:wmyskxz.com

简书 ID:@我没有三颗心脏

github:wmyskxz

欢迎关注公众微信号:wmyskxz

分享本身的学习 & 学习资料 & 生活

想要交流的朋友也能够加 qq 群:3382693

相关文章
相关标签/搜索