Semaphore信号量实现简易版数据库链接池

Semaphore 用于控制同时访问某个特定资源的线程数量,主要用在流量控制java

下面咱们使用 Semaphore 来实现一个简易版数据库链接池数据库

实现思路:less

  1. DBPoolSemaphore 中使用 LinkedList 充当链接池容器
  2. DBPoolSemaphore 中 实现一个取数据库链接和释放数据库链接的方法:takeConnect 和 returnConnect
  3. takeConnect:在mills时间内还拿不到数据库链接,返回一个null.returnConnect:释放链接
  4. DBPoolSemaphore 中定义两个信号量:Semaphore useful,useless.useful表示可用的数据库链接,useless表示已用的数据库链接,takeConnect和returnConnect都要操做这两个信号量
  5. 测试类启动50个线程去取数据库链接并进行数据库操做,最后释放链接.每一个线程取20次
public class DBPoolSemaphore {
	
	private final static int POOL_SIZE = 10;
	/** * useful表示可用的数据库链接,useless表示已用的数据库链接 */
	private final Semaphore useful,useless;
	
	public DBPoolSemaphore() {
		this.useful = new Semaphore(POOL_SIZE);
		this.useless = new Semaphore(0);
	}

	/** * 存放数据库链接的容器 */
	private static final LinkedList<Connection> pool = new LinkedList<>();
	//初始化池
	static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
	}

	/** * 归还链接 * @param connection * @throws InterruptedException */
	public void returnConnect(Connection connection) throws InterruptedException {
		if(connection!=null) {
			System.out.println("当前有" + useful.getQueueLength() + "个线程等待数据库链接!!"
					+ "可用链接数:" + useful.availablePermits());
			//returnConnect 必须为拿到链接的线程调用,acquire方法是阻塞方法
			useless.acquire();
			synchronized (pool) {
				pool.addLast(connection);
			}
			useful.release();
		}
	}

	/** * 从池子拿链接 * @return * @throws InterruptedException */
	public Connection takeConnect() throws InterruptedException {
		//一秒超时
		boolean acquire = useful.tryAcquire(1000, TimeUnit.MILLISECONDS);
		Connection conn = null;
		if (acquire) {
			synchronized (pool) {
				conn = pool.removeFirst();
			}
			//已用的加1
			useless.release();
		}
		return conn;
	}
	
}
复制代码
public class SqlConnectImpl implements Connection{
	
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();
    }

	@Override
	public void commit() throws SQLException {
		SleepTools.ms(70);
	}
	@Override
	public Statement createStatement() throws SQLException {
		SleepTools.ms(1);
		return null;
	}
}
复制代码
public class SemaphoreTest {

	private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
	

	private static class BusiThread extends Thread{
		@Override
		public void run() {
			//让每一个线程持有链接的时间不同
			Random r = new Random();
			long start = System.currentTimeMillis();
			Connection connect = null;
			try {
				connect = dbPool.takeConnect();
				System.out.println("Thread_" + Thread.currentThread().getId()
						+ "_获取数据库链接共耗时【" + (System.currentTimeMillis() - start) + "】ms.");
				//模拟业务操做,线程持有链接查询数据
				SleepTools.ms(100+r.nextInt(100));

			} catch (InterruptedException ignored) {
			}finally {
				System.out.println("查询数据完成,归还链接!");
				if (connect != null) {
					try {
						dbPool.returnConnect(connect);
					} catch (InterruptedException ignored) {
					}
				}
			}
		}
	}
	
	public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusiThread();
            thread.start();
        }
	}
	
}
复制代码
相关文章
相关标签/搜索