【追光者系列】HikariCP源码分析之ConcurrentBag&J.U.C SynchronousQueue、CopyOnWriteArrayList

摘要: 原创出处 微信公众号「工匠小猪猪的技术世界」欢迎转载,保留摘要,谢谢!html

1.这是一个系列,有兴趣的朋友能够持续关注
2.若是你有HikariCP使用上的问题,能够给我留言,咱们一块儿沟通讨论
3.但愿你们能够提供我一些案例,我也但愿能够支持大家作一些调优node


🙂🙂🙂关注**微信公众号:【工匠小猪猪的技术世界】**有福利:数据库

  1. 您对于源码的疑问每条留言将获得认真回复。甚至不知道如何读源码也能够请教噢
  2. 新的源码解析文章实时收到通知。每两周更新一篇左右

ConcurrentBag的定义

HikariCP contains a custom lock-free collection called a ConcurrentBag. The idea was borrowed from the C# .NET ConcurrentBag class, but the internal implementation quite different. The ConcurrentBag provides...数组

  • A lock-free design
  • ThreadLocal caching
  • Queue-stealing
  • Direct hand-off optimizations

...resulting in a high degree of concurrency, extremely low latency, and minimized occurrences of false-sharing.缓存

https://en.wikipedia.org/wiki/False_sharing安全

  • CopyOnWriteArrayList:负责存放ConcurrentBag中所有用于出借的资源
  • ThreadLocal:用于加速线程本地化资源访问
  • SynchronousQueue:用于存在资源等待线程时的第一手资源交接

ConcurrentBag取名来源于C# .NET的同名类,可是实现却不同。它是一个lock-free集合,在链接池(多线程数据交互)的实现上具备比LinkedBlockingQueue和LinkedTransferQueue更优越的并发读写性能。bash

ConcurrentBag源码解析

ConcurrentBag内部同时使用了ThreadLocal和CopyOnWriteArrayList来存储元素,其中CopyOnWriteArrayList是线程共享的。ConcurrentBag采用了queue-stealing的机制获取元素:首先尝试从ThreadLocal中获取属于当前线程的元素来避免锁竞争,若是没有可用元素则扫描公共集合、再次从共享的CopyOnWriteArrayList中获取。(ThreadLocal列表中没有被使用的items在借用线程没有属于本身的时候,是能够被“窃取”的) ThreadLocal和CopyOnWriteArrayList在ConcurrentBag中都是成员变量,线程间不共享,避免了伪共享(false sharing)的发生。 其使用专门的AbstractQueuedLongSynchronizer来管理跨线程信号,这是一个"lock-less“的实现。 这里要特别注意的是,ConcurrentBag中经过borrow方法进行数据资源借用,经过requite方法进行资源回收,注意其中borrow方法只提供对象引用,不移除对象。因此从bag中“借用”的items实际上并无从任何集合中删除,所以即便引用废弃了,垃圾收集也不会发生。所以使用时经过borrow取出的对象必须经过requite方法进行放回,不然会致使内存泄露,只有"remove"方法才能彻底从bag中删除一个对象。微信

好了,咱们一块儿看一下ConcurrentBag源码概览:多线程

上节提过,CopyOnWriteArrayList负责存放ConcurrentBag中所有用于出借的资源,就是private final CopyOnWriteArrayList sharedList; 以下图所示,sharedList中的资源经过add方法添加,remove方法出借并发

add方法向bag中添加bagEntry对象,让别人能够借用

/**
    * Add a new object to the bag for others to borrow.
    *
    * @param bagEntry an object to add to the bag
    */
   public void add(final T bagEntry)
   {
      if (closed) {
         LOGGER.info("ConcurrentBag has been closed, ignoring add()");
         throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
      }


      sharedList.add(bagEntry);//新添加的资源优先放入CopyOnWriteArrayList


      // spin until a thread takes it or none are waiting
      // 当有等待资源的线程时,将资源交到某个等待线程后才返回(SynchronousQueue)
      while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
         yield();
      }
   }

复制代码

remove方法用来从bag中删除一个bageEntry,该方法只能在borrow(long, TimeUnit)和reserve(T)时被使用

/**
    * Remove a value from the bag.  This method should only be called
    * with objects obtained by <code>borrow(long, TimeUnit)</code> or <code>reserve(T)</code>
    *
    * @param bagEntry the value to remove
    * @return true if the entry was removed, false otherwise
    * @throws IllegalStateException if an attempt is made to remove an object
    *         from the bag that was not borrowed or reserved first
    */
   public boolean remove(final T bagEntry)
   {
   // 若是资源正在使用且没法进行状态切换,则返回失败
      if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
         LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
         return false;
      }

      final boolean removed = sharedList.remove(bagEntry);// 从CopyOnWriteArrayList中移出
      if (!removed && !closed) {
         LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
      }

      return removed;
   }
复制代码

ConcurrentBag中经过borrow方法进行数据资源借用

/**
    * The method will borrow a BagEntry from the bag, blocking for the
    * specified timeout if none are available.
    *
    * @param timeout how long to wait before giving up, in units of unit
    * @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
    * @return a borrowed instance from the bag or null if a timeout occurs
    * @throws InterruptedException if interrupted while waiting
    */
   public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
   {
      // Try the thread-local list first
      // 优先查看有没有可用的本地化的资源
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         final Object entry = list.remove(i);
         @SuppressWarnings("unchecked")
         final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
         if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
      }

      // Otherwise, scan the shared list ... then poll the handoff queue
      final int waiting = waiters.incrementAndGet();
      try {
      // 当无可用本地化资源时,遍历所有资源,查看是否存在可用资源
      // 所以被一个线程本地化的资源也可能被另外一个线程“抢走”
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               // If we may have stolen another waiter's connection, request another bag add. if (waiting > 1) { // 由于可能“抢走”了其余线程的资源,所以提醒包裹进行资源添加 listener.addBagItem(waiting - 1); } return bagEntry; } } listener.addBagItem(waiting); timeout = timeUnit.toNanos(timeout); do { final long start = currentTime(); // 当现有所有资源所有在使用中,等待一个被释放的资源或者一个新资源 final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } timeout -= elapsedNanos(start); } while (timeout > 10_000); return null; } finally { waiters.decrementAndGet(); } } 复制代码
/**
    * This method will return a borrowed object to the bag.  Objects
    * that are borrowed from the bag but never "requited" will result
    * in a memory leak.
    *
    * @param bagEntry the value to return to the bag
    * @throws NullPointerException if value is null
    * @throws IllegalStateException if the bagEntry was not borrowed from the bag
    */
   public void requite(final T bagEntry)
   {
   // 将状态转为未在使用
      bagEntry.setState(STATE_NOT_IN_USE);

// 判断是否存在等待线程,若存在,则直接转手资源
      for (int i = 0; waiters.get() > 0; i++) {
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         }
         else if ((i & 0xff) == 0xff) {
            parkNanos(MICROSECONDS.toNanos(10));
         }
         else {
            yield();
         }
      }

 // 不然,进行资源本地化
      final List<Object> threadLocalList = threadList.get();
      threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
   }
复制代码

上述代码中的 weakThreadLocals 是用来判断是否使用弱引用,经过下述方法初始化:

/**
    * Determine whether to use WeakReferences based on whether there is a
    * custom ClassLoader implementation sitting between this class and the
    * System ClassLoader.
    *
    * @return true if we should use WeakReferences in our ThreadLocals, false otherwise
    */
   private boolean useWeakThreadLocals()
   {
      try {
      // 人工指定是否使用弱引用,可是官方不推荐进行自主设置。
         if (System.getProperty("com.zaxxer.hikari.useWeakReferences") != null) {   // undocumented manual override of WeakReference behavior
            return Boolean.getBoolean("com.zaxxer.hikari.useWeakReferences");
         }

// 默认经过判断初始化的ClassLoader是不是系统的ClassLoader来肯定
         return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
      }
      catch (SecurityException se) {
         return true;
      }
   }
复制代码

SynchronousQueue

SynchronousQueue主要用于存在资源等待线程时的第一手资源交接,以下图所示:

在hikariCP中,选择的是公平模式 this.handoffQueue = new SynchronousQueue<>(true);

公平模式总结下来就是:队尾匹配队头出队,先进先出,体现公平原则。

SynchronousQueue是一个是一个无存储空间的阻塞队列(是实现newFixedThreadPool的核心),很是适合作交换工做,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务。 由于是无存储空间的,因此与其余阻塞队列实现不一样的是,这个阻塞peek方法直接返回null,无任何其余操做,其余的方法与阻塞队列的其余方法一致。这个队列的特色是,必须先调用take或者poll方法,才能使用off,add方法。

做为BlockingQueue中的一员,SynchronousQueue与其余BlockingQueue有着不一样特性(来自明姐http://cmsblogs.com/?p=2418):

  • SynchronousQueue没有容量。与其余BlockingQueue不一样,SynchronousQueue是一个不存储元素的BlockingQueue。每个put操做必需要等待一个take操做,不然不能继续添加元素,反之亦然。
  • 由于没有容量,因此对应 peek, contains, clear, isEmpty ... 等方法实际上是无效的。例如clear是不执行任何操做的,contains始终返回false,peek始终返回null。
  • SynchronousQueue分为公平和非公平,默认状况下采用非公平性访问策略,固然也能够经过构造函数来设置为公平性访问策略(为true便可)。
  • 若使用 TransferQueue, 则队列中永远会存在一个 dummy node。

SynchronousQueue提供了两个构造函数:

public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        // 经过 fair 值来决定公平性和非公平性
        // 公平性使用TransferQueue,非公平性采用TransferStack
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
复制代码

TransferQueue、TransferStack继承Transferer,Transferer为SynchronousQueue的内部类,它提供了一个方法transfer(),该方法定义了转移数据的规范

abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }
复制代码

transfer()方法主要用来完成转移数据的,若是e != null,至关于将一个数据交给消费者,若是e == null,则至关于从一个生产者接收一个消费者交出的数据。

SynchronousQueue采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,他们两种都是经过链表实现的,其节点分别为QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演着很是重要的做用,SynchronousQueue的put、take操做都是委托这两个类来实现的。

公平模式

公平模式底层使用的TransferQueue内部队列,一个head和tail指针,用于指向当前正在等待匹配的线程节点。 (来自https://blog.csdn.net/yanyan19880509/article/details/52562039) 初始化时,TransferQueue的状态以下:

接着咱们进行一些操做:

一、线程put1执行 put(1)操做,因为当前没有配对的消费线程,因此put1线程入队列,自旋一小会后睡眠等待,这时队列状态以下:

二、接着,线程put2执行了put(2)操做,跟前面同样,put2线程入队列,自旋一小会后睡眠等待,这时队列状态以下:

三、这时候,来了一个线程take1,执行了 take操做,因为tail指向put2线程,put2线程跟take1线程配对了(一put一take),这时take1线程不须要入队,可是请注意了,这时候,要唤醒的线程并非put2,而是put1。为什么? 你们应该知道咱们如今讲的是公平策略,所谓公平就是谁先入队了,谁就优先被唤醒,咱们的例子明显是put1应该优先被唤醒。至于读者可能会有一个疑问,明明是take1线程跟put2线程匹配上了,结果是put1线程被唤醒消费,怎么确保take1线程必定能够和次首节点(head.next)也是匹配的呢?其实你们能够拿个纸画一画,就会发现真的就是这样的。 公平策略总结下来就是:队尾匹配队头出队。 执行后put1线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通讯,这时候内部状态以下:

四、最后,再来一个线程take2,执行take操做,这时候只有put2线程在等候,并且两个线程匹配上了,线程put2被唤醒, take2线程take操做返回了2(线程put2的数据),这时候队列又回到了起点,以下所示:

以上即是公平模式下,SynchronousQueue的实现模型。总结下来就是:队尾匹配队头出队,先进先出,体现公平原则。

非公平模式

仍是使用跟公平模式下同样的操做流程,对比两种策略下有何不一样。非公平模式底层的实现使用的是TransferStack, 一个栈,实现中用head指针指向栈顶,接着咱们看看它的实现模型:

一、线程put1执行 put(1)操做,因为当前没有配对的消费线程,因此put1线程入栈,自旋一小会后睡眠等待,这时栈状态以下:

二、接着,线程put2再次执行了put(2)操做,跟前面同样,put2线程入栈,自旋一小会后睡眠等待,这时栈状态以下:

三、这时候,来了一个线程take1,执行了take操做,这时候发现栈顶为put2线程,匹配成功,可是实现会先把take1线程入栈,而后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程

四、最后,再来一个线程take2,执行take操做,这跟步骤3的逻辑基本是一致的,take2线程入栈,而后在循环中匹配put1线程,最终所有匹配完毕,栈变为空,恢复初始状态,以下图所示:

从上面流程看出,虽然put1线程先入栈了,可是倒是后匹配,这就是非公平的由来。

CopyOnWriteArrayList

CopyOnWriteArrayList负责存放ConcurrentBag中所有用于出借的资源。(引自http://www.importnew.com/25034.html)

CopyOnWriteArrayList,顾名思义,Write的时候老是要Copy,也就是说对于任何可变的操做(add、set、remove)都是伴随复制这个动做的,是ArrayList 的一个线程安全的变体。

A thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads. The "snapshot" style iterator method uses a reference to the state of the array at the point that the iterator was created. This array never changes during the lifetime of the iterator, so interference is impossible and the iterator is guaranteed not to throw ConcurrentModificationException. The iterator will not reflect additions, removals, or changes to the list since the iterator was created. Element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw UnsupportedOperationException. All elements are permitted, including null.

CopyOnWriteArrayList的add操做的源代码以下:

public boolean add(E e) {
    //一、先加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        //二、拷贝数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        //三、将元素加入到新数组中
        newElements[len] = e;
        //四、将array引用指向到新数组
        setArray(newElements);
        return true;
    } finally {
       //五、解锁
        lock.unlock();
    }
}
复制代码

一次add大体经历了几个步骤:

一、加锁
二、拿到原数组,获得新数组的大小(原数组大小+1),实例化出一个新的数组来
三、把原数组的元素复制到新数组中去
四、新数组最后一个位置设置为待添加的元素(由于新数组的大小是按照原数组大小+1来的)
五、把Object array引用指向新数组
六、解锁

插入、删除、修改操做也都是同样,每一次的操做都是以对Object[] array进行一次复制为基础的

因为全部的写操做都是在新数组进行的,这个时候若是有线程并发的写,则经过锁来控制,若是有线程并发的读,则分几种状况:
一、若是写操做未完成,那么直接读取原数组的数据;
二、若是写操做完成,可是引用还未指向新数组,那么也是读取原数组数据;
三、若是写操做完成,而且引用已经指向了新的数组,那么直接重新数组中读取数据。

可见,CopyOnWriteArrayList的读操做是能够不用加锁的。

经常使用的List有ArrayList、LinkedList、Vector,其中前两个是线程非安全的,最后一个是线程安全的。Vector虽然是线程安全的,可是只是一种相对的线程安全而不是绝对的线程安全,它只可以保证增、删、改、查的单个操做必定是原子的,不会被打断,可是若是组合起来用,并不能保证线程安全性。好比就像上面的线程1在遍历一个Vector中的元素、线程2在删除一个Vector中的元素同样,势必产生并发修改异常,也就是fail-fast。

因此这就是选择CopyOnWriteArrayList这个并发组件的缘由,CopyOnWriteArrayList如何作到线程安全的呢?

CopyOnWriteArrayList使用了一种叫写时复制的方法,当有新元素添加到CopyOnWriteArrayList时,先从原有的数组中拷贝一份出来,而后在新的数组作写操做,写完以后,再将原来的数组引用指向到新数组。

当有新元素加入的时候,以下图,建立新数组,并往新数组中加入一个新元素,这个时候,array这个引用仍然是指向原数组的。

当元素在新数组添加成功后,将array这个引用指向新数组。

CopyOnWriteArrayList的整个add操做都是在锁的保护下进行的。 这样作是为了不在多线程并发add的时候,复制出多个副本出来,把数据搞乱了,致使最终的数组数据不是咱们指望的。

CopyOnWriteArrayList反映的是三个十分重要的分布式理念:

1)读写分离
咱们读取CopyOnWriteArrayList的时候读取的是CopyOnWriteArrayList中的Object[] array,可是修改的时候,操做的是一个新的Object[] array,读和写操做的不是同一个对象,这就是读写分离。这种技术数据库用的很是多,在高并发下为了缓解数据库的压力,即便作了缓存也要对数据库作读写分离,读的时候使用读库,写的时候使用写库,而后读库、写库之间进行必定的同步,这样就避免同一个库上读、写的IO操做太多
2)最终一致
对CopyOnWriteArrayList来讲,线程1读取集合里面的数据,未必是最新的数据。由于线程二、线程三、线程4四个线程都修改了CopyOnWriteArrayList里面的数据,可是线程1拿到的仍是最老的那个Object[] array,新添加进去的数据并无,因此线程1读取的内容未必准确。不过这些数据虽然对于线程1是不一致的,可是对于以后的线程必定是一致的,它们拿到的Object[] array必定是三个线程都操做完毕以后的Object array[],这就是最终一致。最终一致对于分布式系统也很是重要,它经过容忍必定时间的数据不一致,提高整个分布式系统的可用性与分区容错性。固然,最终一致并非任何场景都适用的,像火车站售票这种系统用户对于数据的实时性要求很是很是高,就必须作成强一致性的。
3)使用另外开辟空间的思路,来解决并发冲突

缺点:
一、由于CopyOnWrite的写时复制机制,因此在进行写操做的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会建立新对象添加到新容器里,而旧容器的对象还在使用,因此有两份对象内存)。若是这些对象占用的内存比较大,好比说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候颇有可能形成频繁的Yong GC和Full GC。以前某系统中使用了一个服务因为每晚使用CopyOnWrite机制更新大对象,形成了每晚15秒的Full GC,应用响应时间也随之变长。针对内存占用问题,能够经过压缩容器中的元素的方法来减小大对象的内存消耗,好比,若是元素全是10进制的数字,能够考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其余的并发容器,如ConcurrentHashMap。
二、不能用于实时读的场景,像拷贝数组、新增元素都须要时间,因此调用一个set操做后,读取到数据可能仍是旧的,虽CopyOnWriteArrayList 能作到最终一致性,可是仍是无法知足实时性要求;
3.数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。因此若是你但愿写入的的数据,立刻能读到,请不要使用CopyOnWrite容器。关于C++的STL中,曾经也有过Copy-On-Write的玩法,参见陈皓的《C++ STL String类中的Copy-On-Write》,后来,由于有不少线程安全上的事,就被去掉了。https://blog.csdn.net/haoel/article/details/24058

随着CopyOnWriteArrayList中元素的增长,CopyOnWriteArrayList的修改代价将愈来愈昂贵,所以,CopyOnWriteArrayList 合适读多写少的场景,不过这类慎用 由于谁也无法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要从新复制数组,这个代价实在过高昂了。在高性能的互联网应用中,这种操做分分钟引发故障。**CopyOnWriteArrayList适用于读操做远多于修改操做的并发场景中。**而HikariCP就是这种场景。

还有好比白名单,黑名单,商品类目的访问和更新场景,假如咱们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,可是某些关键字不容许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单天天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,若是在,则提示不能搜索。

可是使用CopyOnWriteMap须要注意两件事情:

  1. 减小扩容开销。根据实际须要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。

  2. 使用批量添加。由于每次添加,容器每次都会进行复制,因此减小添加次数,能够减小容器的复制次数。

参考资料

http://www.cnblogs.com/taisenki/p/7699667.html http://cmsblogs.com/?p=2418 https://blog.csdn.net/yanyan19880509/article/details/52562039 http://www.importnew.com/25034.html https://blog.csdn.net/linsongbin1/article/details/54581787

相关文章
相关标签/搜索