netty对象池使用与回收

1. Recycler对象池
  Recycler抽象类的实现很是简单,只有三个方法:
  获取对象:Recycler:get()
  回收对象:Recycler:recycle()
  建立对象:Recycler:newObject()
  newObject为抽象方法,须要由实现类本身实现此方法来建立对象。
  Recycler对象池目的是尽量的重复利用同一线程建立的对象,同时为了不占用过多的内存,回收对象时采用必定的比例回收对象(默认1/7规则 注释中有解释),未回收的对象由jvm垃圾回收机制来处理。
  Recycler对象池的数据存储结构以下:
  

  和建立stack相同的线程回收对象时存储在elements数组(pushNow方法回收),若是不是同一个线程则放入WeakOrderQueue队列中,等到须要get对象时且stack为空,会将WeakOrderQueue集合的全部元素根据必定的规则转移到stack的elements数组中。数组

2. 对象的获取并发

  首先判断池中是否存在对象,若是由则优先从本地线程stack中获取Object,若是stack为空时,再将其余线程queue集合的全部对象根据必定的规则转移到本地线程stack中(1/7规则),而后再从stack获取Object并返回.
  若是池中不存在对象,建立对象并返回。 
  相关代码以下:   
    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();//首先判断池中是否存在对象,若是由则优先从本地线程stack中获取Object,若是stack为空时,再将其余线程queue集合的全部对象根据必定的规则转移到本地线程stack中(1/7规则),而后再从stack获取Object并返回.
        if (handle == null) {//若是池中不存在对象,建立对象并返回。
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        //        System.out.println(threadLocal.getClass() + "=" + stack + "=" + handle);
        return (T) handle.value;
    }

    DefaultHandle<T> pop() {
        int size = this.size;
        if (size == 0) {首先判断池中是否存在对象//
            if (!scavenge()) {//若是stack为空时,再将其余线程queue集合的全部对象根据必定的规则转移到本地线程stack中(1/7规则)
                return null;
            }
            size = this.size;
        }
        size--;
        DefaultHandle ret = elements[size];
        elements[size] = null;
        if (ret.lastRecycledId != ret.recycleId) {
            throw new IllegalStateException("recycled multiple times");
        }
        ret.recycleId = 0;
        ret.lastRecycledId = 0;
        this.size = size;
        return ret;
    }
    
    boolean scavenge() {
        // continue an existing scavenge, if any
        if (scavengeSome()) {
            return true;
        }

        // reset our scavenge cursor
        prev = null;
        cursor = head;
        return false;
    }

    boolean scavengeSome() {
        WeakOrderQueue prev;
        WeakOrderQueue cursor = this.cursor;
        if (cursor == null) {
            prev = null;
            cursor = head;
            if (cursor == null) {
                return false;
            }
        } else {
            prev = this.prev;
        }

        boolean success = false;
        do {
            if (cursor.transfer(this)) {//将其余线程queue集合的全部对象根据必定的规则转移到本地线程stack中(1/7规则)
                success = true;
                break;
            }
            WeakOrderQueue next = cursor.next;
            if (cursor.owner.get() == null) {
                // If the thread associated with the queue is gone, unlink it, after
                // performing a volatile read to confirm there is no data left to collect.
                // We never unlink the first queue, as we don't want to synchronize on updating the head.
                if (cursor.hasFinalData()) {
                    for (;;) {
                        if (cursor.transfer(this)) {
                            success = true;
                        } else {
                            break;
                        }
                    }
                }

                if (prev != null) {
                    prev.setNext(next);
                }
            } else {
                prev = cursor;
            }

            cursor = next;

        } while (cursor != null && !success);//遍历全部的WeakOrderQueue

        this.prev = prev;
        this.cursor = cursor;
        return success;
    }

    boolean transfer(Stack<?> dst) {
        Link head = this.head.link;
        if (head == null) {
            return false;
        }

        if (head.readIndex == LINK_CAPACITY) {
            if (head.next == null) {
                return false;
            }
            this.head.link = head = head.next;
        }

        final int srcStart = head.readIndex;
        int srcEnd = head.get();
        final int srcSize = srcEnd - srcStart;
        if (srcSize == 0) {
            return false;
        }

        final int dstSize = dst.size;
        final int expectedCapacity = dstSize + srcSize;

        if (expectedCapacity > dst.elements.length) {
            final int actualCapacity = dst.increaseCapacity(expectedCapacity);
            srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
        }

        if (srcStart != srcEnd) {
            final DefaultHandle[] srcElems = head.elements;
            final DefaultHandle[] dstElems = dst.elements;
            int newDstSize = dstSize;
            for (int i = srcStart; i < srcEnd; i++) {//将queue中全部元素按照(1/7规则)保存到stack中
                DefaultHandle element = srcElems[i];
                if (element.recycleId == 0) {
                    element.recycleId = element.lastRecycledId;
                } else if (element.recycleId != element.lastRecycledId) {
                    throw new IllegalStateException("recycled already");
                }
                srcElems[i] = null;

                if (dst.dropHandle(element)) {
                    // Drop the object.
                    continue;
                }
                element.stack = dst;
                dstElems[newDstSize++] = element;
            }

            if (srcEnd == LINK_CAPACITY && head.next != null) {
                // Add capacity back as the Link is GCed.
                this.head.reclaimSpace(LINK_CAPACITY);
                this.head.link = head.next;
            }

            head.readIndex = srcEnd;
            if (dst.size == newDstSize) {
                return false;
            }
            dst.size = newDstSize;
            return true;
        } else {
            // The destination stack is full already.
            return false;
        }
    }
    
    
View Code

3.  回收对象app

  若是当前回收的线程是原始线程(建立对象的线程),那么调用pushNow,若是超过Stack的容量(默认4*1024)或者1/7规则 就drop(由jvm回收释放)
  若是当前回收的线程不是原始线程,那么调用pushLater,若是当前线程是第一次回收原始线程的对象,须要由当前线程建立的WeakOrderQueue(原始线程的stack对象中能够关联其余线程的WeakOrderQueue)。若是建立队列成功,则进入该队列;失败(queue个数大于maxDelayedQueues)则放弃该对象(由jvm回收释放)
注释:1)由本地线程的WeakOrderQueue回收对象,这么作的缘由时避免并发(races竞争),并且不是每一个对象都有对象回收池来回收,若是超过最大容量限制会放弃,并且本地线程stack采用1/8规则措施,并且将异地线程queue的对象转移到本地线程stack时也采起1/8规则措施
   2)1/7规则是指每7个对象只留取1个回收,剩余部分放弃,如1-7个回收1个,8-15回收2个......
回收对象的相关代码以下:
     (1)DefaultHandle类
        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }
            stack.push(this);
        }
    
        (2)Stack类
        void push(DefaultHandle<?> item) {
        Thread currentThread = Thread.currentThread();
        if (threadRef.get() == currentThread) {
            // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
            pushNow(item);
        } else {
            // The current Thread is not the one that belongs to the Stack
            // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
            // happens later.
            pushLater(item, currentThread);
        }
    
       (3)private void pushNow(DefaultHandle<?> item) {
            if ((item.recycleId | item.lastRecycledId) != 0) {
                throw new IllegalStateException("recycled already");
            }
            item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

            int size = this.size;
            if (size >= maxCapacity || dropHandle(item)) {//为了不回收对象数量太多,占用太多内存,采起了1/8规则措施,参数可调
                // Hit the maximum capacity or should drop - drop the possibly youngest object.
                return;
            }
            if (size == elements.length) {
                elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
            }

            elements[size] = item;
            this.size = size + 1;
        }
        
        (4)private void pushLater(DefaultHandle<?> item, Thread thread) {
            // we don't want to have a ref to the queue as the value in our weak map
            // so we null it out; to ensure there are no races with restoring it later
            // we impose a memory ordering here (no-op on x86)
            Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
            WeakOrderQueue queue = delayedRecycled.get(this);
            if (queue == null) {
                if (delayedRecycled.size() >= maxDelayedQueues) {回收池的异地队列个数有限制,每一个队列的容量也有限制
                    // Add a dummy queue so we know we should drop the object
                    delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                    return;
                }
                // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
                if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
                    // drop object
                    return;
                }
                delayedRecycled.put(this, queue);
            } else if (queue == WeakOrderQueue.DUMMY) {
                // drop object
                return;
            }

            queue.add(item);
        }

        (5)WeakOrderQueue类
        void add(DefaultHandle<?> handle) {
            handle.lastRecycledId = id;

            Link tail = this.tail;
            int writeIndex;
            if ((writeIndex = tail.get()) == LINK_CAPACITY) {
                if (!head.reserveSpace(LINK_CAPACITY)) {
                    // Drop it.
                    return;
                }
                // We allocate a Link so reserve the space
                this.tail = tail = tail.next = new Link();

                writeIndex = tail.get();
            }
            tail.elements[writeIndex] = handle;
            handle.stack = null;
            // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
            // this also means we guarantee visibility of an element in the queue if we see the index updated
            tail.lazySet(writeIndex + 1);
        }
 
View Code
相关文章
相关标签/搜索