对于线程和线程池还有线程安全的理解

进程和线程

进程和线程都是一个时间段的描述,是CPU工做时间段的描述,不过是颗粒大小不一样。 java

来自知乎的图片
他们主要区别是:进程不共享内存,线程能够共享内存。 引用知乎地址

线程:

  • CPU中的Thread:
    CPU中的线程,咱们也叫它们Thread,和OS中的线程的名字同样。他们和cpu相关,常说的4核心8线程就是指cpu线程。CPU的Thread就那么固定几个,是稀缺资源。
  • 操做系统中的Thread: 操做系统中的进程能够不少,进程中的线程就更多了。软件操做系统调度的基本单位是OS的Thread。咱们开发中所指的就是这个线程。

Thread和Runnable

Java中线程的建立有两种方式: 1.经过继承Thread类,重写Thread的run()方法,将线程运行的逻辑放在其中。c++

2.经过实现Runnable接口,实例化Thread类。数组

咱们一般使用第二种,由于能够复用Runnable,更容易实现资源共享,能多个线程同时处理一个资源。缓存

// 1
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("this is a Runnable");
    }
}
// 2
public class MyThread extends Thread {
    @Override
    public void run() {
        super.run();
        System.out.println("this is thread");
    }
}

// 具体使用
public class Main {
    public static void main(String[] args) {
        // 第一种
        Thread thread1 = new Thread(new MyRunnable());
        thread1.start();
        // 第二种
        MyThread thread2 = new MyThread();
        thread2.start();
    }
}
复制代码

而实际Android开发工做中,以上两种都不用,咱们一般使用Android提供的Handler和java.util包里的Executor。安全

Executor

Executor 是一个接口,execute执行Runnable。bash

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
复制代码

看下使用:多线程

val executor: Executor = Executors.newCachedThreadPool()
        executor.execute { }
复制代码

点进去newCachedThreadPool,发现返回的是一个ExecutorService。ExecutorService就是Executor的实现了。架构

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
复制代码

ExecutorService

ExecutorService有两个方法:并发

void shutdown();是指再也不添加任务,执行完已有任务后结束。 List<Runnable> shutdownNow();是当即调用线程的interrupt()结束全部的线程。app

ThreadPoolExecutor

上面看到Executors里面new的是ThreadPoolExecutor,咱们看下ThreadPoolExecutor的构造方法:

//五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
复制代码
  • corePoolSize: 该线程池中核心线程数最大值

核心线程:在建立完线程池以后,核心线程先不建立,在接到任务以后建立核心线程。而且会一直存在于线程池中(即便这个线程啥都不干),有任务要执行时,若是核心线程没有被占用,会优先用核心线程执行任务。数量通常状况下设置为CPU核数的二倍便可。

  • maximumPoolSize: 该线程池中线程总数最大值

线程总数=核心线程数+非核心线程数。

非核心线程:简单理解,即核心线程都被占用,但还有任务要作,就建立非核心线程。

  • keepAliveTime: 非核心线程闲置超时时长

这个参数能够理解为,任务少,但池中线程多,非核心线程不能白养着,超过这个时间不工做的就会被干掉,可是核心线程会保留。

  • TimeUnit: keepAliveTime的单位

TimeUnit是一个枚举类型,其包括:

NANOSECONDS:1微毫秒 = 1微秒 / 1000
MICROSECONDS:1微秒 = 1毫秒 / 1000
MILLISECONDS:1毫秒 = 1秒 /1000
SECONDS:秒
MINUTES:分
HOURS:小时
DAYS:天
复制代码
  • BlockingQueue workQueue: 线程池中的任务队列

默认状况下,任务进来以后先分配给核心线程执行,核心线程若是都被占用,并不会马上开启非核心线程执行任务,而是将任务插入任务队列等待执行,核心线程会从任务队列取任务来执行,任务队列能够设置最大值,一旦插入的任务足够多,达到最大值,才会建立非核心线程执行任务。

常见的workQueue有四种:

  1. SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,若是全部线程都在工做怎么办?那就新建一个线程来处理这个任务!因此为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize通常指定成Integer.MAX_VALUE,即无限大。

  2. LinkedBlockingQueue:这个队列接收到任务的时候,若是当前已经建立的核心线程数小于线程池的核心线程数上限,则新建线程(核心线程)处理任务;若是当前已经建立的核心线程数等于核心线程数上限,则进入队列等待。因为这个队列没有最大值限制,即全部超过核心线程数的任务都将被添加到队列中,这也就致使了maximumPoolSize的设定失效,由于总线程数永远不会超过corePoolSize

  3. ArrayBlockingQueue:能够限定队列的长度,接收到任务的时候,若是没有达到corePoolSize的值,则新建线程(核心线程)执行任务,若是达到了,则入队等候,若是队列已满,则新建线程(非核心线程)执行任务,又若是总线程数到了maximumPoolSize,而且队列也满了,则发生错误,或是执行实现定义好的饱和策略。

  4. DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。

  • ThreadFactory threadFactory -> 建立线程的工厂

能够用线程工厂给每一个建立出来的线程设置名字。通常状况下无须设置该参数。

  • RejectedExecutionHandler handler -> 饱和拒绝策略

这是当任务队列和线程池都满了时所采起的应对策略,默认是AbordPolicy。

AbordPolicy:表示没法处理新任务,并抛出 RejectedExecutionException 异常。此外还有3种策略,它们分别以下。

CallerRunsPolicy:用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,可以减缓新任务的提交速度。

DiscardPolicy:不能执行的任务,并将该任务删除。

DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。

四种线程池

Executors类为咱们提供的四种简单建立线程池的方法:

private val fix = Executors.newFixedThreadPool(4)
private val cache = Executors.newCachedThreadPool()
private val single = Executors.newSingleThreadExecutor()
private val scheduled = Executors.newScheduledThreadPool(4)
复制代码

其实就是调用不一样的ThreadPoolExecutor的构造方法。下面一个一个分析:

  1. FixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    复制代码

    FixedThreadPool的corePoolSize和maximumPoolSize都设置为参数nThreads,也就是只有固定数量的核心线程,不存在非核心线程。keepAliveTime为0L表示多余的线程马上终止,由于不会产生多余的线程,因此这个参数是无效的,也就是说线程不会被回收一直保存在线程池。FixedThreadPool的任务队列采用的是LinkedBlockingQueue。通常咱们设置为cpu核心数+1。

    private val fix = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1)

    FixThreadPool其实就像一堆人排队上公厕同样,能够无数多人排队,可是厕所位置就那么多,并且没人上时,厕所闲置着也不会搬走。

  2. SingleThreadPool

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    复制代码

    咱们能够看到总线程数和核心线程数都是1,因此就只有一个核心线程。该线程池才用链表阻塞队列LinkedBlockingQueue,先进先出原则,因此保证了任务的按顺序逐一进行。

    SingleThreadPool能够理解为公厕里只有一个坑位,先来先上。

  3. CachedThreadPool

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    复制代码

    CachedThreadPool的corePoolSize是0,maximumPoolSize是Int的最大值,也就是说CachedThreadPool没有核心线程,所有都是非核心线程,而且没有上限。keepAliveTime是60秒,就是说空闲线程等待新任务60秒,超时则销毁。此处用到的队列是阻塞队列SynchronousQueue,这个队列没有缓冲区,因此其中最多只能存在一个元素,有新的任务则阻塞等待。

    适用于频繁IO的操做,由于他们的任务量小,可是任务基数很是庞大,使用核心线程处理的话,数量建立方面就很成问题。

CachedThreadPool有点像去冲浪,由于海洋无限大,随时去都有位置冲浪,一我的冲完60秒内能够免费给下一我的玩。超过60秒冲浪板就被商家回收。

  1. ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
复制代码

能够看出corePoolSize是传进来的固定值,maximumPoolSize无限大,由于采用的队列DelayedWorkQueue是无解的,因此maximumPoolSize参数无效。若是运行的线程达到了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务会放在队列的前面。在跟此前介绍的线程池不一样的是,当执行完任务后,会将ScheduledFutureTask中的time变量改成下次要执行的时间并放回到DelayedWorkQueue中。

ScheduledThreadPool主要用于执行定时任务以及有固定周期的重复任务。

Callable

Callable是java1.5添加进来的一个加强版本。相似于Runnable,却又有差别:

  1. Runnable是自从java1.1就有了,而Callable是1.5以后才加上去的。
  2. Callable规定的方法是call(),Runnable规定的方法是run()。
  3. Callable的任务执行后可返回值,而Runnable的任务是不能返回值(是void)。
  4. call方法能够抛出异常,run方法不能够。
  5. 运行Callable任务能够拿到一个Future对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。经过Future对象能够了解任务执行状况,可取消任务的执行,还可获取执行结果。
  6. 加入线程池运行,Runnable使用ExecutorService的execute方法,Callable使用submit方法。

下面看下使用:

val executor: ExecutorService = Executors.newSingleThreadExecutor()
    val future: Future<String> = executor.submit(MyCallable())
    try {
        val string: String = future.get()
    } catch (e: ExecutionException) {

    }
    executor.shutdown()
复制代码
class MyCallable() : Callable<String> {
        override fun call(): String {
            return "done"
        }
    }
复制代码

线程安全

JMM

由于硬件架构,会致使一些问题,特别在多线程的时候更为突出:

  • 缓存一致性问题:在多处理器系统中,每一个处理器都有本身的高速缓存,而它们又共享同一主内存(MainMemory)。基于高速缓存的存储交互很好地解决了处理器与内存的速度矛盾,可是也引入了新的问题:缓存一致性(CacheCoherence)。当多个处理器的运算任务都涉及同一块主内存区域时,将可能致使各自的缓存数据不一致的状况,若是真的发生这种状况,那同步回到主内存时以谁的缓存数据为准呢?为了解决一致性的问题,须要各个处理器访问缓存时都遵循一些协议,在读写时要根据协议来进行操做,这类协议有MSI、MESI(IllinoisProtocol)、MOSI、Synapse、Firefly及DragonProtocol,等等。

  • 指令重排序问题:为了使得处理器内部的运算单元能尽可能被充分利用,处理器可能会对输入代码进行乱序执行(Out-Of-Order Execution)优化,处理器会在计算以后将乱序执行的结果重组,保证该结果与顺序执行的结果是一致的,但并不保证程序中各个语句计算的前后顺序与输入代码中的顺序一致。所以,若是存在一个计算任务依赖另外一个计算任务的中间结果,那么其顺序性并不能靠代码的前后顺序来保证。与处理器的乱序执行优化相似,Java虚拟机的即时编译器中也有相似的指令重排序(Instruction Reorder)优化。

线程间通讯必需要通过主内存。

以下,若是线程A与线程B之间要通讯的话,必需要经历下面2个步骤:

1)线程A把本地内存A中更新过的共享变量刷新到主内存中去。

2)线程B到主内存中去读取线程A以前已更新过的共享变量。

当对象和变量被存放在计算机中各类不一样的内存区域中时,就可能会出现一些具体的问题。Java内存模型创建所围绕的问题:在多线程并发过程当中,如何处理多线程读同步问题与可见性(多线程缓存与指令重排序)、多线程写同步问题与原子性。

Java内存模型(即Java Memory Model,简称JMM)自己是一种抽象的概念,并不真实存在,它描述的是一组规则或规范,经过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。

  1. 多线程读同步与可见性 线程对共享变量修改的可见性。当一个线程修改了共享变量的值,其余线程可以马上得知这个修改。

  2. 原子性 指一个操做是按原子的方式执行的。要么该操做不被执行;要么以原子方式执行,即执行过程当中不会被其它线程中断。

  3. 有序性 有序性是指对于单线程的执行代码,咱们老是认为代码的执行是按顺序依次执行的,这样的理解并无毛病,毕竟对于单线程而言确实如此,但对于多线程环境,则可能出现乱序现象,由于程序编译成机器码指令后可能会出现指令重排现象,重排后的指令与原指令的顺序未必一致,要明白的是,在Java程序中,假若在本线程内,全部操做都视为有序行为,若是是多线程环境下,一个线程中观察另一个线程,全部操做都是无序的,前半句指的是单线程内保证串行语义执行的一致性,后半句则指指令重排现象和工做内存与主内存同步延迟现象。

volatile

volatile关键字有以下两个做用

  1. 保证被volatile修饰的共享变量对全部线程总数可见的,也就是当一个线程修改了一个被volatile修饰共享变量的值,新值总数能够被其余线程当即得知。
  2. 禁止指令重排序优化。
//线程1
boolean stop = false;
while(!stop){
    doSomething();
}
 
//线程2
stop = true;
复制代码

若是线程2改变了stop的值,线程1必定会中止吗?不必定。当线程2更改了stop变量的值以后,可是还没来得及写入主存当中,线程2转去作其余事情了,那么线程1因为不知道线程2对stop变量的更改,所以还会一直循环下去。

可是用volatile修饰以后就变得不同了:

//线程1
volatile boolean stop = false;
while(!stop){
    doSomething();
}
 
//线程2
stop = true;
复制代码

第一:使用volatile关键字会强制将修改的值当即写入主存;

第二:使用volatile关键字的话,当线程2进行修改时,会致使线程1的工做内存中缓存变量stop的缓存行无效(反映到硬件层的话,就是CPU的L1或者L2缓存中对应的缓存行无效);

第三:因为线程1的工做内存中缓存变量stop的缓存行无效,因此线程1再次读取变量stop的值时会去主存读取。

那么在线程2修改stop值时(固然这里包括2个操做,修改线程2工做内存中的值,而后将修改后的值写入内存),会使得线程1的工做内存中缓存变量stop的缓存行无效,而后线程1读取时,发现本身的缓存行无效,它会等待缓存行对应的主存地址被更新以后,而后去对应的主存读取最新的值。

那么线程1读取到的就是最新的正确的值

这也就是内存模型JMM的内存可见性。

private volatile int inc = 0;

    void count() {
        inc++;
    }

    void add() {
        new Thread() {
            @Override
            public void run() {
                for (int j = 0; j < 100_00_00; j++) {
                    count();
                }
                System.out.println(inc);
            }
        }.start();
        new Thread() {
            @Override
            public void run() {
                for (int j = 0; j < 100_00_00; j++) {
                    count();
                }
                System.out.println(inc);
            }
        }.start();

    }

复制代码

看这段代码,2个线程分别加一百万次。结果会打印出两百万次吗?不会的。可能有的人就会有疑问,不对啊,上面是对变量inc进行自增操做,因为volatile保证了可见性,那么在每一个线程中对inc自增完以后,在其余线程中都能看到修改后的值啊,因此有两个线程分别进行了一百万次操做,那么最终inc的值应该是两百万啊。

  这里面就有一个误区了,volatile关键字能保证可见性没有错,可是上面的程序错在没能保证原子性。可见性只能保证每次读取的是最新的值,可是volatile没办法保证对变量的操做的原子性。

inc++; 实际上是两个步骤,先加加,而后再赋值。不是原子性操做,因此volatile不能保证线程安全。

synchronized

synchronized是Java中的关键字,是利用锁的机制来实现同步的。Synchronized的做用主要有三个:

  1. 原子性:确保线程互斥的访问同步代码;
  2. 可见性:保证共享变量的修改可以及时可见,实际上是经过Java内存模型中的 “对一个变量unlock操做以前,必需要同步到主内存中;若是对一个变量进行lock操做,则将会清空工做内存中此变量的值,在执行引擎使用此变量前,须要从新从主内存中load操做或assign操做初始化变量值” 来保证的;
  3. 有序性:有效解决重排序问题,即 “一个unlock操做先行发生(happen-before)于后面对同一个锁的lock操做”;

synchronized 能够修饰方法和代码块,进入synchronized修饰的方法或者代码块的线程,就会获取monitor对象,monitor也就是Java里的对象锁。

下面看下经典的卖票案例:

class Ticket implements Runnable {
    /* 五百张票 */
    private int tickets = 500;

    @Override
    public void run() {

        while (true) {
            //同步锁
            synchronized (this) {
                if (tickets > 0) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d张票!\n", Thread.currentThread().getName(), tickets--);
                } else {
                    System.out.printf("%s窗口已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }
}
复制代码
public static void main(String[] args) {
        Ticket ticket = new Ticket();
        Thread thread1= new Thread(ticket);
        Thread thread2 = new Thread(ticket);
        Thread thread3 = new Thread(ticket);
        thread1.start();
        thread2.start();
        thread3.start();
    }
复制代码

3个线程卖500张票。利用synchronized实现线程安全,下面修改下实现:

class Ticket  {
    /* 五百张票 */
    private int tickets = 500;

    public void sellTckets() {
        while (true) {
            //同步锁
            synchronized (this) {
                if (tickets > 0) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d张票!\n", Thread.currentThread().getName(), tickets--);
                } else {
                    System.out.printf("%s窗口已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }
}
复制代码
public static void main(String[] args) {
        final Ticket ticket = new Ticket();
        Thread thread1= new Thread(){
            @Override
            public void run() {
                ticket.sellTckets();
            }
        };
        Thread thread2 = new Thread(){
            @Override
            public void run() {
                ticket.sellTckets();
            }
        };
        Thread thread3 = new Thread(){
            @Override
            public void run() {
                ticket.sellTckets();
            }
        };
        thread1.start();
        thread2.start();
        thread3.start();
    }
复制代码

同样的线程安全,多线程卖票,可是如今我不只要卖票,还要订餐,卖票和订餐是两个互不干涉的操做,可是由于 synchronized (this)拿到的是同一个对象锁,因此若是线程1在卖票,那么线程2就不能拿到对象锁去订餐:

class Ticket  {
    /* 二百张票 */
    private int tickets = 200;
    /* 二百份盒饭 */
    private int foods = 200;

    public void sell​​Tckets() {
        while (true) {
            //同步锁
            synchronized (this) {
                if (tickets > 0) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d张票!\n", Thread.currentThread().getName(), tickets--);
                } else {
                    System.out.printf("%s窗口车票已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }

    public void sellFoods() {
        while (true) {
            //同步锁
            synchronized (this) {
                if (foods > 0) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d份盒饭!\n", Thread.currentThread().getName(), foods--);
                } else {
                    System.out.printf("%s窗口盒饭已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }

复制代码

那么怎么能多线程订票的同时,别的线程也能够订餐呢?用不一样的对象便可:

class Ticket {
    private int tickets = 200;
 
    private int foods = 200;
    Object object1 = new Object();
    Object object2 = new Object();

    public void sellTickets() {
        while (true) {
            //同步锁
            synchronized (object1) {
                if (tickets > 0) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d张票!\n", Thread.currentThread().getName(), tickets--);
                } else {
                    System.out.printf("%s窗口车票已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }

    public void sellFoods() {
        while (true) {
            //同步锁
            synchronized (object2) {
                if (foods > 0) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d份盒饭!\n", Thread.currentThread().getName(), foods--);
                } else {
                    System.out.printf("%s窗口盒饭已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }
}
复制代码

这就像你家里2个卧室,门锁是同样的锁因此都用同一把钥匙。老王拿着钥匙进入主卧反锁了门睡觉,你想去次卧睡,可是钥匙被老王拿进主卧了。你去不了次卧。只能等他出来把钥匙给你。怎么能你俩都去睡觉呢?那就配两把钥匙。老王拿着主卧的钥匙去了主卧,你拿着次卧的钥匙去次卧睡。

相关文章
相关标签/搜索