在咱们使用数据库的过程当中,咱们每每使用数据库链接池而不是直接使用数据库链接进行操做,这是由于每个数据库链接的建立和销毁的代价是昂贵的,而池化技术则预先建立了资源,这些资源是可复用的,这样就保证了在多用户状况下只能使用指定数目的资源,避免了一个用户建立一个链接资源,形成程序运行开销过大。关于Java并发编程的总结和思考html
这里只实现一个简易的链接池,更多复杂的需求可根据该链接池进行改进,该链接池主要参数以下:java
程序流程图以下:
数据库
泛型接口ConnectionPool.java编程
public interface ConnectionPool<T> { /** * 初始化池资源 * @param maxActive 池中最大活动链接数 * @param maxWait 最大等待时间 */ void init(Integer maxActive, Long maxWait); /** * 从池中获取资源 * @return 链接资源 */ T getResource() throws Exception; /** * 释放链接 * @param connection 正在使用的链接 */ void release(T connection) throws Exception; /** * 释放链接池资源 */ void close(); }
以zookeeper为例,实现zookeeper链接池,ZookeeperConnectionPool.javasegmentfault
public class ZookeeperConnectionPool implements ConnectionPool<ZooKeeper> { //最大活动链接数 private Integer maxActive; //最大等待时间 private Long maxWait; //空闲队列 private LinkedBlockingQueue<ZooKeeper> idle = new LinkedBlockingQueue<>(); //繁忙队列 private LinkedBlockingQueue<ZooKeeper> busy = new LinkedBlockingQueue<>(); //链接池活动链接数 private AtomicInteger activeSize = new AtomicInteger(0); //链接池关闭标记 private AtomicBoolean isClosed = new AtomicBoolean(false); //总共获取的链接记数 private AtomicInteger createCount = new AtomicInteger(0); //等待zookeeper客户端建立完成的计数器 private static ThreadLocal<CountDownLatch> latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1)); public ZookeeperConnectionPool(Integer maxActive, Long maxWait) { this.init(maxActive, maxWait); } @Override public void init(Integer maxActive, Long maxWait) { this.maxActive = maxActive; this.maxWait = maxWait; } @Override public ZooKeeper getResource() throws Exception { ZooKeeper zooKeeper; Long nowTime = System.currentTimeMillis(); final CountDownLatch countDownLatch = latchThreadLocal.get(); //空闲队列idle是否有链接 if ((zooKeeper = idle.poll()) == null) { //判断池中链接数是否小于maxActive if (activeSize.get() < maxActive) { //先增长池中链接数后判断是否小于等于maxActive if (activeSize.incrementAndGet() <= maxActive) { //建立zookeeper链接 zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> { if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } }); countDownLatch.await(); System.out.println("Thread:" + Thread.currentThread().getId() + "获取链接:" + createCount.incrementAndGet() + "条"); busy.offer(zooKeeper); return zooKeeper; } else { //如增长后发现大于maxActive则减去增长的 activeSize.decrementAndGet(); } } //若活动线程已满则等待busy队列释放链接 try { System.out.println("Thread:" + Thread.currentThread().getId() + "等待获取空闲资源"); Long waitTime = maxWait - (System.currentTimeMillis() - nowTime); zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new Exception("等待异常"); } //判断是否超时 if (zooKeeper != null) { System.out.println("Thread:" + Thread.currentThread().getId() + "获取链接:" + createCount.incrementAndGet() + "条"); busy.offer(zooKeeper); return zooKeeper; } else { System.out.println("Thread:" + Thread.currentThread().getId() + "获取链接超时,请重试!"); throw new Exception("Thread:" + Thread.currentThread().getId() + "获取链接超时,请重试!"); } } //空闲队列有链接,直接返回 busy.offer(zooKeeper); return zooKeeper; } @Override public void release(ZooKeeper connection) throws Exception { if (connection == null) { System.out.println("connection 为空"); return; } if (busy.remove(connection)){ idle.offer(connection); } else { activeSize.decrementAndGet(); throw new Exception("释放失败"); } } @Override public void close() { if (isClosed.compareAndSet(false, true)) { idle.forEach((zooKeeper) -> { try { zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }); busy.forEach((zooKeeper) -> { try { zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
这里建立20个线程并发测试链接池,Test.java并发
public class Test { public static void main(String[] args) throws Exception { int threadCount = 20; Integer maxActive = 10; Long maxWait = 10000L; ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait); CountDownLatch countDownLatch = new CountDownLatch(20); for (int i = 0; i < threadCount; i++) { new Thread(() -> { countDownLatch.countDown(); try { countDownLatch.await(); ZooKeeper zooKeeper = pool.getResource(); Thread.sleep(2000); pool.release(zooKeeper); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } }).start(); } while (true){ } } }
来源:https://segmentfault.com/a/1190000017926611ide