【Java并发编程】2.经常使用线程的并发工具类


在这里插入图片描述

ForkJoin

1.Fork/Join流程:

ForkJoin是一种分治的思想,在1.7中引入JDK中。现实生活中的快排,队排,MapReduce都是思想的 实现,意思是在必要的状况下,将一个大任务,进行拆分(fork) 成若干个子任务(拆到不能再拆,这里就是指咱们制定的拆分的临界值),再将一个个小任务的结果进行join汇总。
在这里插入图片描述java

2. 工做窃取模式

从上述Fork/Join框架的描述能够看出,咱们须要一些线程来执行Fork出的任务,在实际中,若是每次都建立新的线程执行任务,对系统资源的开销会很大,因此Fork/Join框架利用了线程池来调度任务。web

另外,这里能够思考一个问题,既然由线程池调度,根据咱们以前学习普通/计划线程池的经验,必然存在两个要素:算法

工做线程
任务队列数据库

通常的线程池只有一个任务队列,可是对于Fork/Join框架来讲,因为Fork出的各个子任务实际上是平行关系,为了提升效率,减小线程竞争,应该将这些平行的任务放到中去,如上不一样的队列图中,大任务分解成三个子任务:子任务一、子任务2,那么就建立两个任务队列,而后再建立3个工做线程与队列一一对应。segmentfault

那么为何须要使用工做窃取算法呢?假如咱们须要作一个比较大的任务,咱们能够把这个任务分割为若干互不依赖的子任务,为了减小线程间的竞争,因而把这些子任务分别放到不一样的队列里,并为每一个队列建立一个单独的线程来执行队列里的任务,线程和队列一一对应,好比A线程负责处理A队列里的任务。可是有的线程会先把本身队列里的任务干完,而其余线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其余线程干活,因而它就去其余线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,因此为了减小窃取任务线程和被窃取任务线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行在这里插入图片描述
知足这一需求的任务队列其实就是JUC框架中介绍过的双端阻塞队列 LinkedBlockingDeque数组

工做窃取算法的优势是充分利用线程进行并行计算,并减小了线程间的竞争,其缺点是在某些状况下仍是存在竞争,好比双端队列里只有一个任务时。而且消耗了更多的系统资源,好比建立多个线程和多个双端队列。而且在进行RR跟上下文切换也会耗时的,因此不必定是多线程就必定 比单线程速度快。弹性而定,看任务量。多线程

3. demo演示

ForkJoin有两种继承方式,RecursiveTask有返回值,RecursiveAction无返回值
任务需求:假设有个很是大的long[]数组,经过FJ框架求解数组全部元素的和。
任务类定义,由于须要返回结果,因此继承RecursiveTask,并覆写compute方法。任务的fork经过ForkJoinTask的fork方法执行,join方法方法用于等待任务执行后返回:框架

public class ForkJoinWork extends RecursiveTask<Long> { 
 
   
    private Long start;//起始值
    private Long end;//结束值
    public static final Long critical = 100000L;//临界值

    public ForkJoinWork(Long start, Long end) { 
 
   
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() { 
 
   
        // return null;
        //判断是不是拆分完毕
        Long lenth = end - start;   //起始值差值
        if (lenth <= critical) { 
 
   
            //若是拆分完毕就相加
            Long sum = 0L;
            for (Long i = start; i <= end; i++) { 
 
   
                sum += i;
            }
            return sum;
        } else { 
 
   
            //没有拆分完毕就开始拆分
            Long middle = (end + start) / 2;//计算的两个值的中间值
            ForkJoinWork right = new ForkJoinWork(start, middle);
            right.fork();//拆分,并压入线程队列
            ForkJoinWork left = new ForkJoinWork(middle + 1, end);
            left.fork();//拆分,并压入线程队列

            //合并
            return right.join() + left.join();
        }

    }
}

测试:less

public class ForkJoinWorkTest { 
 
   
    @Test
    public void test() { 
 
   
        //ForkJoin实现
        long l = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();//实现ForkJoin 就必须有ForkJoinPool的支持
        ForkJoinTask<Long> task = new ForkJoinWork(0L, 10000000000L);//参数为起始值与结束值
        Long invoke = forkJoinPool.invoke(task);
        long l1 = System.currentTimeMillis();
        System.out.println("invoke = " + invoke + " time: " + (l1 - l));
        //invoke = -5340232216128654848 time: 56418
        //ForkJoinWork forkJoinWork = new ForkJoinWork(0L, 10000000000L);
    }
    @Test
    public void test2() { 
 
   
        //普通线程实现
        Long x = 0L;
        Long y = 10000000000L;
        long l = System.currentTimeMillis();
        for (Long i = 0L; i <= y; i++) { 
 
   
            x += i;
        }
        long l1 = System.currentTimeMillis();
        System.out.println("invoke = " + x + " time: " + (l1 - l));
        //invoke = -5340232216128654848 time: 64069
    }

    @Test
    public void test3() { 
 
   
        //Java 8 并行流的实现
        long l = System.currentTimeMillis();
        long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum);
        long l1 = System.currentTimeMillis();
        System.out.println("invoke = " + reduce + " time: " + (l1 - l));
        //invoke = -5340232216128654848 time: 2152
    }
}

结论:Java 8 就为咱们提供了一个并行流来实现ForkJoin实现的功能。能够看到并行流比本身实现ForkJoin还要快。dom

Java 8 中将并行流进行了优化,咱们能够很容易的对数据进行并行流的操做,Stream API能够声明性的经过parallel()与sequential()在并行流与串行流中随意切换!

核心组件

F/J框架的实现很是复杂,内部大量运用了位操做和无锁算法,撇开这些实现细节不谈,该框架主要涉及三大核心组件:ForkJoinPool(线程池)、ForkJoinTask(任务)、ForkJoinWorkerThread(工做线程),外加WorkQueue(任务队列):

  1. ForkJoinPool:ExecutorService的实现类,负责工做线程的管理、任务队列的维护,以及控制整个任务调度流程;
  2. ForkJoinTask:Future接口的实现类,fork是其核心方法,用于分解任务并异步执行;而join方法在任务结果计算完毕以后才会运行,用来合并或返回计算结果;
  3. ForkJoinWorkerThread:Thread的子类,做为线程池中的工做线程(Worker)执行任务;
  4. WorkQueue:任务队列,用于保存任务;
ForkJoinPool

ForkJoinPool做为Executors框架的一员,从外部看与其它线程池并无什么区别,仅仅是ExecutorService的一个实现类:

clipboard.png
ForkJoinPool的主要工做以下:

  1. 接受外部任务的提交(外部调用ForkJoinPool的invoke/execute/submit方法提交任务);
  2. 接受ForkJoinTask自身fork出的子任务的提交;
  3. 任务队列数组(WorkQueue[])的初始化和管理;
    工做线程(Worker)的建立/管理。

注意:ForkJoinPool提供了3类外部提交任务的方法:invokeexecutesubmit,它们的主要区别在于任务的执行方式上。

  1. 经过invoke方法提交的任务,调用线程直到任务执行完成才会返回,也就是说这是一个同步方法,且有返回结果;
  2. 经过execute方法提交的任务,调用线程会当即返回,也就是说这是一个异步方法,且没有返回结果;
  3. 经过submit方法提交的任务,调用线程会当即返回,也就是说这是一个=异步=方法,且有==返回结果(返回Future实现类,能够经过get获取结果)。

注意:ForkJoinPool支持两种模式:

同步模式(默认方式)
异步模式

这里的同步/异步并不是指F/J框架自己是采用同步模式仍是采用异步模式工做,而是指其中的工做线程的工做方式。在F/J框架中,每一个工做线程(Worker)都有一个属于本身的任务队列(WorkQueue),这是一个底层采用数组实现的双向队列。
同步是指:对于工做线程(Worker)自身队列中的任务,采用后进先出(LIFO)的方式执行;异步是指:对于工做线程(Worker)自身队列中的任务,采用先进先出(FIFO)的方式执行

ForkJoinTask

从Fork/Join框架的描述上来看,“任务”必需要知足必定的条件:

支持Fork,即任务自身的分解
支持Join,即任务结果的合并

所以,J.U.C提供了一个抽象类——ForkJoinTask,来做为该类Fork/Join任务的抽象定义

ForkJoinTask实现了Future接口,是一个异步任务,咱们在使用Fork/Join框架时,通常须要使用线程池来调度任务,线程池内部调度的其实都是ForkJoinTask任务(即便提交的是一个Runnable或Callable任务,也会被适配成ForkJoinTask)。
除了ForkJoinTask,Fork/Join框架还提供了两个它的抽象实现,咱们在自定义ForkJoin任务时,通常继承这两个类:

RecursiveAction:表示具备返回结果的ForkJoin任务
RecursiveTask:表示没有返回结果的ForkJoin任务

ForkJoinWorkerThread

Fork/Join框架中,每一个工做线程(Worker)都有一个本身的任务队列(WorkerQueue), 因此须要对通常的Thread作些特性化处理,J.U.C提供了ForkJoinWorkerThread类做为ForkJoinPool中的工做线程:

public class ForkJoinWorkerThread extends Thread { 
 
   
    
    final ForkJoinPool pool;                    // 该工做线程归属的线程池
    final ForkJoinPool.WorkQueue workQueue;     // 对应的任务队列
 
    protected ForkJoinWorkerThread(ForkJoinPool pool) { 
 
   
        super("aForkJoinWorkerThread");         // 指定工做线程名称
        this.pool = pool;
        this.workQueue = pool.registerWorker(this);
    }
  
    // ...
}

ForkJoinWorkerThread 在构造过程当中,会。同时,它会经过ForkJoinPool的registerWorker方保存所属线程池信息和与本身绑定的任务队列信息法将本身注册到线程池中。

WorkQueue

任务队列(WorkQueue)是ForkJoinPool与其它线程池区别最大的地方,在ForkJoinPool内部,维护着一个WorkQueue[]数组,它会在外部首次提交任务)时进行初始化:

volatile WorkQueue[] workQueues; // main registry

当经过线程池的外部方法(submit、invoke、execute)提交任务时,若是WorkQueue[]没有初始化,则会进行初始化;而后根据数组大小和线程随机数(ThreadLocalRandom.probe)等信息,计算出任务队列所在的数组索引(这个索引必定是偶数),若是索引处没有任务队列,则初始化一个,再将任务入队。也就是说,经过外部方法提交的任务必定是在偶数队列,没有绑定工做线程。
WorkQueue做为ForkJoinPool的内部类,表示一个双端队列双端队列,既能够做为栈使用(LIFO),也能够做为队列使用(FIFO)。ForkJoinPool的“工做窃取”正是利用了这个特色,当工做线程从本身的队列中获取任务时,默认老是以栈操做(LIFO)的方式从栈顶取任务;当工做线程尝试窃取其它任务队列中的任务时,则是FIFO的方式。

线程池中的每一个工做线程(ForkJoinWorkerThread)都有一个本身的任务队列(WorkQueue),工做线程优先处理自身队列中的任务(LIFO或FIFO顺序,由线程池构造时的参数 mode 决定),自身队列为空时,以FIFO的顺序随机窃取其它队列中的任务。

F/J框架的核心来自于它的工做窃取调度策略,能够总结为如下几点:

  1. 每一个Worker线程利用它本身的任务队列维护可执行任务;
  2. 任务队列是一种双端队列,支持LIFO的push和pop操做,也支持FIFO的take操做;
  3. 任务fork的子任务,只会push到它所在线程(调用fork方法的线程)的队列;
  4. 工做线程既可使用LIFO经过pop处理本身队列中的任务,也能够FIFO经过poll处理本身队列中的任务,具体取决于构造线程池时的asyncMode参数;
  5. 当工做线程本身队列中没有待处理任务时,它尝试去随机读取(窃取)其它任务队列的base端的任务;
  6. 当线程进入join操做,它也会去处理其它工做线程的队列中的任务(本身的已经处理完了),直到目标任务完成(经过isDone方法);
  7. 当一个工做线程没有任务了,而且尝试从其它队列窃取也失败了,它让出资源(经过使用yields, sleeps或者其它优先级调整)而且随后会再次激活,直到全部工做线程都空闲了——此时,它们都阻塞在等待另外一个顶层线程的调用。

CountDownLatch

CountDownLatch是一个很是实用的多线程控制工具类,能够简单联想到下课倒计时一块儿开饭,百米赛跑一块儿跑。经常使用的就下面几个方法:

CountDownLatch(int count) //实例化一个倒计数器,count指定计数个数
countDown() // 计数减一
await() //等待,当计数减到0时,全部线程并行执行

CountDownLatch在咱们工做的多个场景被使用,算是用的很频繁的了,好比咱们的API接口响应时间被要求在200ms之内,可是若是一个接口内部依赖多个三方/外部服务,那串行调用接口的RT必然好久,因此我的用的最多的是接口RT优化场景,内部服务并行调用

对于倒计数器,一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,每每还要进行各项设备、仪器的检测。只有等到全部的检查完毕后,引擎才能点火。那么在检测环节固然是多个检测项能够的。同时进行代码:

/** * @Description: 倒计时器示例:火箭发射 */
public class CountDownLatchDemo implements Runnable{ 
 
   

    static final CountDownLatch latch = new CountDownLatch(10);
    static final CountDownLatchDemo demo = new CountDownLatchDemo();

    @Override
    public void run() { 
 
   
        // 模拟检查任务
        try { 
 
   
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println("检查完毕");
        } catch (InterruptedException e) { 
 
   
            e.printStackTrace();
        } finally { 
 
   
            //计数减一
            //放在finally避免任务执行过程出现异常,致使countDown()不能被执行
            latch.countDown();
        }
    }
    public static void main(String[] args) throws InterruptedException { 
 
   
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i=0; i<10; i++){ 
 
   
            exec.submit(demo);
        }
        // 等待检查
        latch.await(); // 外部主线程main 方法来等待下面运行!!!
        // 发射火箭
        System.out.println("Fire!");
        // 关闭线程池
        exec.shutdown();
    }
}

上述代码中咱们先生成了一个CountDownLatch实例。计数数量为10,这表示须要有10个线程来完成任务,等待在CountDownLatch上的线程才能继续执行。latch.countDown(); 方法做用是通知CountDownLatch有一个线程已经准备完毕,倒计数器能够减一了。atch.await()方法要求主线程等待全部10个检查任务所有准备好才一块儿并行执行。
latch.countDown()的调用不必定非要开启线程执行,即便你在主线程中下面这样写效果也是同样。

for (int i = 0; i < 10; i++) { 
 
   
     countDownLatch.countDown();
 }

CyclicBarrier

这个类的中文意思是循环栅栏。大概的意思就是一个可循环利用的屏障。
它的做用就是会让全部线程都等待完成后才会继续下一步行动。
举个例子,就像生活中咱们会约朋友们到某个餐厅一块儿吃饭,有些朋友可能会早到,有些朋友可能会晚到,可是这个餐厅规定必须等到全部人到齐以后才会让咱们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。
构造方法

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)

parties 是参与线程的个数
第二个构造方法有一个 Runnable 参数,这个参数的意思是到达线程最后一个要作的任务

重要方法:

public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

线程调用 await() 表示本身已经到达栅栏
BrokenBarrierException 表示栅栏已经被破坏,破坏的缘由多是其中一个线程 await() 时被中断或者超时

demo:一个线程组的线程须要等待全部线程完成任务后再继续执行下一次任务

public class CyclicBarrierTest { 
 
   
    public static void main(String[] args) { 
 
   
        //定义一个计数器,当计数器的值累加到30,输出"放行"
        CyclicBarrier cyclicBarrier = new CyclicBarrier(30,()->{ 
 
   
            System.out.println("放行");
        });
        for (int i = 1; i <= 90; i++) { 
 
   
            final int temp = i;
            new Thread(()->{ 
 
   
                System.out.println("-->"+temp);
                try { 
 
   
                    cyclicBarrier.await();
                } catch (InterruptedException e) { 
 
   
                    e.printStackTrace();
                } catch (BrokenBarrierException e) { 
 
   
                    e.printStackTrace();
                }
            }).start();

        }
    }
}

上面的结果会出现3次放行哦。

CyclicBarrier 与 CountDownLatch 区别

CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
CountDownLatch 参与的线程的职责是不同的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是同样的。
CountDownLatch 作减法计算,count=0,唤醒阻塞线程,CyclicBarrier 作加法计算,count=屏障值(parties),唤醒阻塞线程。
最重要:CountDownLatch的放行由第三者控制,CyclicBarrier是由一组线程自己来控制的, CountDownLatch放行条件>=线程数。CyclicBarrier放行条件=线程数。

Semaphore

用途:控制同时访问某个特定资源的线程数据,用来流量控制。
一个超市只能容纳5我的购物,其他人排队。

public class SemaphoreTest { 
 
   
    public static void main(String[] args) { 
 
   
        //同时只能进5我的
        Semaphore semaphore = new Semaphore(5);
        for (int i = 0; i < 15; i++) { 
 
   
            new Thread(() -> { 
 
   
                try { 
 
   
                    //得到许可
                    semaphore.acquire(); // 已经进店人+1
                    System.out.println(Thread.currentThread().getName() + "进店购物");
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println(Thread.currentThread().getName() + "出店");
                } catch (InterruptedException e) { 
 
   
                    e.printStackTrace();
                } finally { 
 
   
                    //释放许可
                    semaphore.release(); //已经进店人 -1 
                }
            }, String.valueOf(i)).start();
        }
    }
}

实现数据库链接池
数据库链接实现:

public class SqlConnectImpl implements Connection{ 
 
   
	
	/*拿一个数据库链接*/
    public static final Connection fetchConnection(){ 
 
   
        return new SqlConnectImpl();
    }
}

链接池的实现:

public class DBPoolSemaphore { 
 
   

    private final static int POOL_SIZE = 10;
    private final Semaphore useful, useless;//useful表示可用的数据库链接,useless表示已用的数据库链接

    public DBPoolSemaphore() { 
 
   
        this.useful = new Semaphore(POOL_SIZE);
        this.useless = new Semaphore(0);
    }

    //存放数据库链接的容器
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    //初始化池
    static { 
 
   
        for (int i = 0; i < POOL_SIZE; i++) { 
 
   
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }

    /*归还链接*/
    public void returnConnect(Connection connection) throws InterruptedException { 
 
   
        if (connection != null) { 
 
   
            System.out.println("当前有" + useful.getQueueLength() + "个线程等待数据库链接!!"
                    + "可用链接数:" + useful.availablePermits());
            useless.acquire();// 可用链接 +1
            synchronized (pool) { 
 
   
                pool.addLast(connection);
            }
            useful.release(); // 已用链接 -1
        }
    }

    /*从池子拿链接*/
    public Connection takeConnect() throws InterruptedException { 
 
   
        useful.acquire(); // 可用链接-1
        Connection conn;
        synchronized (pool) { 
 
   
            conn = pool.removeFirst();
        }
        useless.release(); // 以用链接+1
        return conn;
    }
}

测试代码:

public class AppTest { 
 
   

    private static DBPoolSemaphore dbPool = new DBPoolSemaphore();

    //业务线程
    private static class BusiThread extends Thread { 
 
   
        @Override
        public void run() { 
 
   
            Random r = new Random();//让每一个线程持有链接的时间不同
            long start = System.currentTimeMillis();
            try { 
 
   
                Connection connect = dbPool.takeConnect();
                System.out.println("Thread_" + Thread.currentThread().getId()
                        + "_获取数据库链接共耗时【" + (System.currentTimeMillis() - start) + "】ms.");
                SleepTools.ms(100 + r.nextInt(100));//模拟业务操做,线程持有链接查询数据
                System.out.println("查询数据完成,归还链接!");
                dbPool.returnConnect(connect);
            } catch (InterruptedException e) { 
 
   
            }
        }
    }

    public static void main(String[] args) { 
 
   
        for (int i = 0; i < 50; i++) { 
 
   
            Thread thread = new BusiThread();
            thread.start();
        }
    }
}

Exchange

两个线程间的数据交换,局限性比较大。Exchange是 阻塞形式的,两个线程要都到达执行Exchange函数才会交换。

public class UseExchange { 
 
   
    private static final Exchanger<Set<String>> exchange
            = new Exchanger<Set<String>>();

    public static void main(String[] args) { 
 
   

        //第一个线程
        new Thread(new Runnable() { 
 
   
            @Override
            public void run() { 
 
   
                Set<String> setA = new HashSet<String>();//存放数据的容器
                try { 
 
   
                    setA.add("liu");
                    setA.add("Liu");
                    setA.add("LIU");
                    setA = exchange.exchange(setA);//交换set
                    /*处理交换后的数据*/
                } catch (InterruptedException e) { 
 
   
                }
            }
        }).start();

        //第二个线程
        new Thread(new Runnable() { 
 
   
            @Override
            public void run() { 
 
   
                Set<String> setB = new HashSet<String>();//存放数据的容器
                try { 
 
   
                    setB.add("jin");
                    setB.add("Jie");
                    setB.add("JIN");
                    setB = exchange.exchange(setB);//交换set
                    /*处理交换后的数据*/
                } catch (InterruptedException e) { 
 
   
                }
            }
        }).start();
    }
}

Callable,Future,FutureTask

这三个组合使用,通常咱们能够将耗时任务用子线程去执行,同时执行咱们本身的主线程任务。主线程执行任务完毕后再调Future.get()来得到子线程任务。
在这里插入图片描述
说明:

Callable有返回值可抛出异常,其中返回值有Future得到。
Future 得到返回值。
FutureTask实现Future跟Runnable。1.7前用AQS实现的,1.8之后再也不是。

Future主要函数功能:

  1. isDone,结束,正常仍是异常结束,或者本身取消,都返回true;
  2. isCancelled 任务完成前被取消,返回true;
  3. cancel(boolean):
  • 任务还没开始,返回false
  • 任务已经启动,cancel(true)
  • 中断正在运行的任务,中断成功,返回true
  • cancel(false),不会去中断已经运行的任务
  • 任务已经结束,返回false

demo:

public class UseFuture { 
 
   

    /*实现Callable接口,容许有返回值*/
    private static class UseCallable implements Callable<Integer> { 
 
   
        private int sum;
        @Override
        public Integer call() throws Exception { 
 
   
            System.out.println("Callable子线程开始计算");
            Thread.sleep(2000);
            for (int i = 0; i < 5000; i++) { 
 
   
                sum = sum + i;
            }
            System.out.println("Callable子线程计算完成,结果=" + sum);
            return sum;
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
 
   
        UseCallable useCallable = new UseCallable();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(useCallable);
        new Thread(futureTask).start();
        Random r = new Random();
        SleepTools.second(1);
        if (r.nextBoolean()) { 
 
     // 方法调用返回下一个伪均匀分布的boolean值
            System.out.println("Get UseCallable result = " + futureTask.get());
        } else { 
 
   
            System.out.println("中断计算");
            futureTask.cancel(true);
        }
    }
}

在这里插入图片描述

参考

F/J

本文同步分享在 博客“SoWhat1412”(CSDN)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索