java并发编程 - 利用对象等待和通知机制实现一个等待超时的链接池

        众所周知,Java的Object对象提供的,wait()和notify()/notifyAll()等接口是并发编程的重要组成部分。它们对多线程之间的协做起了很是重要的做用,实际开发中也有不少场景能够采用。废话少说,今天咱们就用此机制来模拟实现一个jdbc支持等待超时模式的链接池。数据库


1、模拟实现一个数据库链接接口
编程

//类说明:空实现一个Connection接口(由于重点不在这里,因此如下接口中的方法只作简单处理)
public class SqlConnectImpl implements Connection{
   
   /*拿一个数据库链接*/
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();
    }

   @Override
   public boolean isWrapperFor(Class<?> arg0) throws SQLException {
      // TODO Auto-generated method stub
      return false;
   }
   //由于重点不在这里,因此这里省略其它接口...
 }


2、实现数据库等待超时链接池的核心方法多线程


//类说明:链接池的实现
DBPool {
    //模拟:数据库链接池
    LinkedList<Connection> pool = LinkedList<Connection>()(initialSize) {
        if(initialSize > ) {
            for(int i = 0;i < initialSize; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
    }

    //链接池:释放链接,通知其余线程
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool){
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }

    //链接池:获取链接使用
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool){
            //未设置超时,直接获取
            if(mills <0){
                while (pool.isEmpty()){
                    pool.wait();
                }
                return pool.removeFirst();
            }
            //设置超时
            long future = System.currentTimeMillis()+mills;/*超时时刻*/
            long remaining = mills;
            while (pool.isEmpty() && remaining > 0){
                pool.wait(remaining);
                //唤醒一次:从新计算等待时长
                remaining = future - System.currentTimeMillis();
            }
            Connection connection = null;
            if(!pool.isEmpty()){
                connection = pool.removeFirst();
            }
            return connection;
        }
    }
}


3、多线程并发模式下对链接池的访问并发


//类说明:数据库链接池测试类
public class DBPoolTest {
    static DBPool pool  = new DBPool(10);
    // 控制器:控制main线程将会等待全部Woker结束后才能继续执行
    static CountDownLatch end;

    public static void main(String[] args) throws Exception {
       // 线程数量
        int threadCount = 50;
        end = new CountDownLatch(threadCount);
        int count = 20;//每一个线程的操做次数
        AtomicInteger got = new AtomicInteger();//计数器:统计能够拿到链接的线程
        AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到链接的线程
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new Worker(count, got, notGot), 
                  "worker_"+i);
            thread.start();
        }
        end.await();// main线程在此处等待
        System.out.println("总共尝试了: " + (threadCount * count));
        System.out.println("拿到链接的次数:  " + got);
        System.out.println("没能链接的次数: " + notGot);
    }

    static class Worker implements Runnable {
        int           count;
        AtomicInteger got;
        AtomicInteger notGot;

        public Worker(int count, AtomicInteger got,
                               AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        public void run() {
            while (count > 0) {
                try {
                    // 从线程池中获取链接,若是1000ms内没法获取到,将会返回null
                    // 分别统计链接获取的数量got和未获取到的数量notGot
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                        System.out.println(Thread.currentThread().getName()
                              +"等待超时!");
                    }
                } catch (Exception ex) {
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}


4、测试结果报告app

image.png


5、结束语ide

总结:1) 对wait()、notify()、notifyAll()等方法使用时须用,synchronized关键包裹(对象、方法或块)都可, 不包裹运行中必报错;测试

          2) 线程在执行wait()方法在会自动释放持有的锁;fetch

          3) 线程在执行notify()或notifyAll()后,不会当即释放该线程持有的锁资源,只有在synchronized包裹的语句块或方法执行完毕后才会释放;this

          4) 采用notify()方法唤醒时只会随机唤醒一个线程,在多唤醒条件下不适用此方法。推荐使用notifyAll()唤醒全部与锁对象相关的全部线程;spa

相关文章
相关标签/搜索