JUC就是java.util.concurrent

什么是JUC?java

  JUC就是java.util.concurrent包,这个包俗称JUC,里面都是解决并发问题的一些东西sql

  该包的位置位于java下面的rt.jar包下面数据库

4大经常使用并发工具类:安全

  CountDownLatch并发

  CyclicBarrierapp

  Semaphore框架

  ExChangerless

 

CountDownLatch:dom

  CountDownLatch,俗称闭锁,做用是相似增强版的Join,是让一组线程等待其余的线程完成工做之后才执行ide

  就好比在启动框架服务的时候,咱们主线程须要在环境线程初始化完成以后才能启动,这时候咱们就能够实现使用CountDownLatch来完成

复制代码
/**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
复制代码

  在源码中能够看到,建立CountDownLatch时,须要传入一个int类型的参数,将决定在执行次扣减以后,等待的线程被唤醒

   经过这个类图就能够知道其实CountDownLatch并无多少东西

  方法介绍:

    CountDownLatch:初始化方法

    await:等待方法,同时带参数的是超时重载方法

    countDown:每执行一次,计数器减一,就是初始化传入的数字,也表明着一个线程完成了任务

    getCount:获取当前值

    toString:这个就不用说了

  里面的Sync是一个内部类,外面的方法其实都是操做这个内部类的,这个内部类继承了AQS,实现的标准方法,AQS将在后面的章节写

 

主线程中建立CountDownLatch(3),而后主线程await阻塞,而后线程A,B,C各自完成了任务,调用了countDown,以后,每一个线程调用一次计数器就会减一,初始是3,而后A线程调用后变成2,B线程调用后变成1,C线程调用后,变成0,这时就会唤醒正在await的主线程,而后主线程继续执行

说一千道一万,不如代码写几行,上代码:

休眠工具类,以后的代码都会用到

复制代码
package org.dance.tools;

import java.util.concurrent.TimeUnit;

/**
 * 类说明:线程休眠辅助工具类
 */
public class SleepTools {

    /**
     * 按秒休眠
     * @param seconds 秒数
     */
    public static final void second(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
    }

    /**
     * 按毫秒数休眠
     * @param seconds 毫秒数
     */
    public static final void ms(int seconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
    }
}
复制代码
复制代码
package org.dance.day2.util;

import org.dance.tools.SleepTools;

import java.util.concurrent.CountDownLatch;

/**
 * CountDownLatch的使用,有五个线程,6个扣除点
 * 扣除完成后主线程和业务线程,才能执行工做
 *  扣除点通常都是大于等于须要初始化的线程的
 * @author ZYGisComputer
 */
public class UseCountDownLatch {

    /**
     * 设置为6个扣除点
     */
    static CountDownLatch countDownLatch = new CountDownLatch(6);

    /**
     * 初始化线程
     */
    private static class InitThread implements Runnable {

        @Override
        public void run() {

            System.out.println("thread_" + Thread.currentThread().getId() + " ready init work .....");

            // 执行扣减 扣减不表明结束
            countDownLatch.countDown();

            for (int i = 0; i < 2; i++) {
                System.out.println("thread_" + Thread.currentThread().getId() + ".....continue do its work");
            }

        }
    }

    /**
     * 业务线程
     */
    private static class BusiThread implements Runnable {

        @Override
        public void run() {

            // 业务线程须要在等初始化完毕后才能执行
            try {
                countDownLatch.await();
                for (int i = 0; i < 3; i++) {
                    System.out.println("BusiThread " + Thread.currentThread().getId() + " do business-----");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {

        // 建立单独的初始化线程
        new Thread(){
            @Override
            public void run() {
                SleepTools.ms(1);
                System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 1st.....");
                // 扣减一次
                countDownLatch.countDown();
                System.out.println("begin stop 2nd.....");
                SleepTools.ms(1);
                System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 2nd.....");
                // 扣减一次
                countDownLatch.countDown();

            }
        }.start();
        // 启动业务线程
        new Thread(new BusiThread()).start();
        // 启动初始化线程
        for (int i = 0; i <= 3; i++) {
            new Thread(new InitThread()).start();
        }
        // 主线程进入等待
        try {
            countDownLatch.await();
            System.out.println("Main do ites work.....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}
复制代码

返回结果:

复制代码
thread_13 ready init work .....
thread_13.....continue do its work
thread_13.....continue do its work
thread_14 ready init work .....
thread_14.....continue do its work
thread_14.....continue do its work
thread_15 ready init work .....
thread_15.....continue do its work
thread_11 ready init work step 1st.....
begin stop 2nd.....
thread_16 ready init work .....
thread_16.....continue do its work
thread_16.....continue do its work
thread_15.....continue do its work
thread_11 ready init work step 2nd.....
Main do ites work.....
BusiThread 12 do business-----
BusiThread 12 do business-----
BusiThread 12 do business-----
复制代码

经过返回结果就能够很直接的看到业务线程是在初始化线程彻底跑完以后,才开始执行的

 

CyclicBarrier:

  CyclicBarrier,俗称栅栏锁,做用是让一组线程到达某个屏障,被阻塞,一直到组内的最后一个线程到达,而后屏障开放,接着,全部的线程继续运行

  这个感受和CountDownLatch有点类似,可是实际上是不同的,所谓的差异,将在下面详解

  CyclicBarrier的构造参数有两个

复制代码
/**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
复制代码
复制代码
/**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
复制代码

很明显能感受出来,上面的构造参数调用了下面的构造参数,是一个构造方法重载

首先这个第一个参数也树Int类型的,传入的是执行线程的个数,这个数量和CountDownLatch不同,这个数量是须要和线程数量吻合的,CountDownLatch则不同,CountDownLatch能够大于等于,而CyclicBarrier只能等于,而后是第二个参数,第二个参数是barrierAction,这个参数是当屏障开放后,执行的任务线程,若是当屏障开放后须要执行什么任务,能够写在这个线程中

 

 

主线程建立CyclicBarrier(3,barrierAction),而后由线程开始执行,线程A,B执行完成后都调用了await,而后他们都在一个屏障前阻塞者,须要等待线程C也,执行完成,调用await以后,而后三个线程都达到屏障后,屏障开放,而后线程继续执行,而且barrierAction在屏障开放的一瞬间也开始执行

上代码:

复制代码
package org.dance.day2.util;

import org.dance.tools.SleepTools;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * CyclicBarrier的使用
 *
 * @author ZYGisComputer
 */
public class UseCyclicBarrier {

    /**
     * 存放子线程工做结果的安全容器
     */
    private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,new CollectThread());

    /**
     * 结果打印线程
     * 用来演示CyclicBarrier的第二个参数,barrierAction
     */
    private static class CollectThread implements Runnable {

        @Override
        public void run() {

            StringBuffer result = new StringBuffer();

            for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
                result.append("[" + workResult.getValue() + "]");
            }

            System.out.println("the result = " + result);
            System.out.println("do other business.....");

        }
    }

    /**
     * 工做子线程
     * 用于CyclicBarrier的一组线程
     */
    private static class SubThread implements Runnable {

        @Override
        public void run() {

            // 获取当前线程的ID
            long id = Thread.currentThread().getId();

            // 放入统计容器中
            resultMap.put(String.valueOf(id), id);

            Random random = new Random();

            try {
                if (random.nextBoolean()) {
                    Thread.sleep(1000 + id);
                    System.out.println("Thread_"+id+"..... do something");
                }
                System.out.println(id+" is await");
                cyclicBarrier.await();
                Thread.sleep(1000+id);
                System.out.println("Thread_"+id+".....do its business");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {

        for (int i = 0; i <= 4; i++) {
            Thread thread = new Thread(new SubThread());
            thread.start();
        }

    }

}
复制代码

返回结果:

复制代码
11 is await
14 is await
15 is await
Thread_12..... do something
12 is await
Thread_13..... do something
13 is await
the result = [11][12][13][14][15]
do other business.....
Thread_11.....do its business
Thread_12.....do its business
Thread_13.....do its business
Thread_14.....do its business
Thread_15.....do its business
复制代码

经过返回结果能够看出前面的11 14 15三个线程没有进入if语句块,在执行到await的时候进入了等待,而另外12 13两个线程进入到了if语句块当中,多休眠了1秒多,而后当5个线程同时到达await的时候,屏障开放,执行了barrierAction线程,而后线程组继续执行

解释一下CountDownLatch和CyclicBarrier的却别吧!

首先就是CountDownLatch的构造参数传入的数量通常都是大于等于线程,数量的,由于他是有第三方控制的,能够扣减屡次,而后就是CyclicBarrier的构造参数第一个参数传入的数量必定是等于线程的个数的,由于他是由一组线程自身控制的

区别

      CountDownLatch  CyclicBarrier

控制         第三方控制       自身控制

传入数量  大于等于线程数量       等于线程数量

 

Semaphore:

  Semaphore,俗称信号量,做用于控制同时访问某个特定资源的线程数量,用在流量控制

  一说特定资源控制,那么第一时间就想到了数据库链接..

  以前用等待超时模式写了一个数据库链接池,打算用这个Semaphone也写一个

复制代码
/**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
复制代码

在源码中能够看到在构建Semaphore信号量的时候,须要传入许可证的数量,这个数量就是资源的最大容许的访问的线程数

接下里用信号量实现一个数据库链接池

链接对象

复制代码
package org.dance.day2.util.pool;

import org.dance.tools.SleepTools;

import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;

/**
 * 数据库链接
 * @author ZYGisComputer
 */
public class SqlConnection implements Connection {

    /**
     * 获取数据库链接
     * @return
     */
    public static final Connection fetchConnection(){
        return new SqlConnection();
    }

    @Override
    public void commit() throws SQLException {
        SleepTools.ms(70);
    }

    @Override
    public Statement createStatement() throws SQLException {
        SleepTools.ms(1);
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return null;
    }

    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        return null;
    }

    @Override
    public String nativeSQL(String sql) throws SQLException {
        return null;
    }

    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {

    }

    @Override
    public boolean getAutoCommit() throws SQLException {
        return false;
    }

    @Override
    public void rollback() throws SQLException {

    }

    @Override
    public void close() throws SQLException {

    }

    @Override
    public boolean isClosed() throws SQLException {
        return false;
    }

    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        return null;
    }

    @Override
    public void setReadOnly(boolean readOnly) throws SQLException {

    }

    @Override
    public boolean isReadOnly() throws SQLException {
        return false;
    }

    @Override
    public void setCatalog(String catalog) throws SQLException {

    }

    @Override
    public String getCatalog() throws SQLException {
        return null;
    }

    @Override
    public void setTransactionIsolation(int level) throws SQLException {

    }

    @Override
    public int getTransactionIsolation() throws SQLException {
        return 0;
    }

    @Override
    public SQLWarning getWarnings() throws SQLException {
        return null;
    }

    @Override
    public void clearWarnings() throws SQLException {

    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }

    @Override
    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return null;
    }

    @Override
    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {

    }

    @Override
    public void setHoldability(int holdability) throws SQLException {

    }

    @Override
    public int getHoldability() throws SQLException {
        return 0;
    }

    @Override
    public Savepoint setSavepoint() throws SQLException {
        return null;
    }

    @Override
    public Savepoint setSavepoint(String name) throws SQLException {
        return null;
    }

    @Override
    public void rollback(Savepoint savepoint) throws SQLException {

    }

    @Override
    public void releaseSavepoint(Savepoint savepoint) throws SQLException {

    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return null;
    }

    @Override
    public Clob createClob() throws SQLException {
        return null;
    }

    @Override
    public Blob createBlob() throws SQLException {
        return null;
    }

    @Override
    public NClob createNClob() throws SQLException {
        return null;
    }

    @Override
    public SQLXML createSQLXML() throws SQLException {
        return null;
    }

    @Override
    public boolean isValid(int timeout) throws SQLException {
        return false;
    }

    @Override
    public void setClientInfo(String name, String value) throws SQLClientInfoException {

    }

    @Override
    public void setClientInfo(Properties properties) throws SQLClientInfoException {

    }

    @Override
    public String getClientInfo(String name) throws SQLException {
        return null;
    }

    @Override
    public Properties getClientInfo() throws SQLException {
        return null;
    }

    @Override
    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        return null;
    }

    @Override
    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        return null;
    }

    @Override
    public void setSchema(String schema) throws SQLException {

    }

    @Override
    public String getSchema() throws SQLException {
        return null;
    }

    @Override
    public void abort(Executor executor) throws SQLException {

    }

    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {

    }

    @Override
    public int getNetworkTimeout() throws SQLException {
        return 0;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return null;
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return false;
    }
}
复制代码

链接池对象

复制代码
package org.dance.day2.util.pool;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;

/**
 * 使用信号量控制数据库的连接和释放
 *
 * @author ZYGisComputer
 */
public class DBPoolSemaphore {

    /**
     * 池容量
     */
    private final static int POOL_SIZE = 10;

    /**
     * useful 表明可用链接
     * useless 表明已用链接
     *  为何要使用两个Semaphore呢?是由于,在链接池中不仅有链接自己是资源,空位也是资源,也须要记录
     */
    private final Semaphore useful, useless;

    /**
     * 链接池
     */
    private final static LinkedList<Connection> POOL = new LinkedList<>();

    /**
     * 使用静态块初始化池
     */
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            POOL.addLast(SqlConnection.fetchConnection());
        }
    }

    public DBPoolSemaphore() {
        // 初始可用的许可证等于池容量
        useful = new Semaphore(POOL_SIZE);
        // 初始不可用的许可证容量为0
        useless = new Semaphore(0);
    }

    /**
     * 获取数据库链接
     *
     * @return 链接对象
     */
    public Connection takeConnection() throws InterruptedException {
        // 可用许可证减一
        useful.acquire();
        Connection connection;
        synchronized (POOL) {
            connection = POOL.removeFirst();
        }
        // 不可用许可证数量加一
        useless.release();
        return connection;
    }

    /**
     * 释放连接
     *
     * @param connection 链接对象
     */
    public void returnConnection(Connection connection) throws InterruptedException {
        if(null!=connection){
            // 打印日志
            System.out.println("当前有"+useful.getQueueLength()+"个线程等待获取链接,,"
                    +"可用链接有"+useful.availablePermits()+"个");
            // 不可用许可证减一
            useless.acquire();
            synchronized (POOL){
                POOL.addLast(connection);
            }
            // 可用许可证加一
            useful.release();
        }
    }

}
复制代码

测试类:

复制代码
package org.dance.day2.util.pool;

import org.dance.tools.SleepTools;

import java.sql.Connection;
import java.util.Random;

/**
 * 测试Semaphore
 * @author ZYGisComputer
 */
public class UseSemaphore {

    /**
     * 链接池
     */
    public static final DBPoolSemaphore pool = new DBPoolSemaphore();

    private static class BusiThread extends Thread{
        @Override
        public void run() {
            // 随机数工具类 为了让每一个线程持有链接的时间不同
            Random random = new Random();
            long start = System.currentTimeMillis();
            try {
                Connection connection = pool.takeConnection();
                System.out.println("Thread_"+Thread.currentThread().getId()+
                        "_获取数据库链接耗时["+(System.currentTimeMillis()-start)+"]ms.");
                // 模拟使用链接查询数据
                SleepTools.ms(100+random.nextInt(100));
                System.out.println("查询数据完成归还链接");
                pool.returnConnection(connection);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            BusiThread busiThread = new BusiThread();
            busiThread.start();
        }
    }

}
复制代码

测试返回结果:

复制代码
Thread_11_获取数据库链接耗时[0]ms.
Thread_12_获取数据库链接耗时[0]ms.
Thread_13_获取数据库链接耗时[0]ms.
Thread_14_获取数据库链接耗时[0]ms.
Thread_15_获取数据库链接耗时[0]ms.
Thread_16_获取数据库链接耗时[0]ms.
Thread_17_获取数据库链接耗时[0]ms.
Thread_18_获取数据库链接耗时[0]ms.
Thread_19_获取数据库链接耗时[0]ms.
Thread_20_获取数据库链接耗时[0]ms.
查询数据完成归还链接
当前有40个线程等待获取链接,,可用链接有0个
Thread_21_获取数据库链接耗时[112]ms.
查询数据完成归还链接
...................
查询数据完成归还链接 当前有2个线程等待获取链接,,可用链接有0个 Thread_59_获取数据库链接耗时[637]ms. 查询数据完成归还链接 当前有1个线程等待获取链接,,可用链接有0个 Thread_60_获取数据库链接耗时[660]ms. 查询数据完成归还链接 当前有0个线程等待获取链接,,可用链接有0个 查询数据完成归还链接
................... 当前有0个线程等待获取链接,,可用链接有8个 查询数据完成归还链接 当前有0个线程等待获取链接,,可用链接有9个
复制代码

经过执行结果能够很明确的看到,一上来就有10个线程获取到了链接,,而后后面的40个线程进入阻塞,而后只有释放连接以后,等待的线程就会有一个拿到,而后越后面的线程等待的时间就越长,而后一直到全部的线程执行完毕

最后打印的可用链接有九个不是由于少了一个是由于在释放以前打印的,不是错误

从结果中能够看到,咱们对链接池中的资源的到了控制,这就是信号量的流量控制

 

Exchanger:

  Exchanger,俗称交换器,用于在线程之间交换数据,可是比较受限,由于只能两个线程之间交换数据

/**
     * Creates a new Exchanger.
     */
    public Exchanger() {
        participant = new Participant();
    }

这个构造函数没有什么好说的,也没有入参,只有在建立的时候指定一下须要交换的数据的泛型便可,下面看代码

复制代码
package org.dance.day2.util;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;

/**
 * 线程之间交换数据
 * @author ZYGisComputer
 */
public class UseExchange {

    private static final Exchanger<Set<String>> exchanger = new Exchanger<>();

    public static void main(String[] args) {

        new Thread(){
            @Override
            public void run() {
                Set<String> aSet = new HashSet<>();
                aSet.add("A");
                aSet.add("B");
                aSet.add("C");
                try {
                    Set<String> exchange = exchanger.exchange(aSet);
                    for (String s : exchange) {
                        System.out.println("aSet"+s);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread(){
            @Override
            public void run() {
                Set<String> bSet = new HashSet<>();
                bSet.add("1");
                bSet.add("2");
                bSet.add("3");
                try {
                    Set<String> exchange = exchanger.exchange(bSet);
                    for (String s : exchange) {
                        System.out.println("bSet"+s);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

    }

}
复制代码

执行结果:

bSetA
bSetB
bSetC
aSet1
aSet2
aSet3

经过执行结果能够清晰的看到,两个线程中的数据发生了交换,这就是Exchanger的线程数据交换了

以上就是JUC的4大经常使用并发工具类了

相关文章
相关标签/搜索