Jdk1.8--ThreadPoolExecute的Worker类

线程池的work对象---一个同步的工做队列,实现了runable接口

/**
 * Class Worker mainly maintains interrupt control state for
 * threads running tasks, along with other minor bookkeeping.
 * This class opportunistically extends AbstractQueuedSynchronizer
 * to simplify acquiring and releasing a lock surrounding each
 * task execution.  This protects against interrupts that are
 * intended to wake up a worker thread waiting for a task from
 * instead interrupting a task being run.  We implement a simple
 * non-reentrant mutual exclusion lock rather than use
 * ReentrantLock because we do not want worker tasks to be able to
 * reacquire the lock when they invoke pool control methods like
 * setCorePoolSize.  Additionally, to suppress interrupts until
 * the thread actually starts running tasks, we initialize lock
 * state to a negative value, and clear it upon start (in
 * runWorker).
 */
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * [@param](https://my.oschina.net/u/2303379) firstTask the first task (null if none)
     * 初始化当前工做队列,队列的第一个任务是 FirstTask,为任务的执行建立新的            * 线程,设置当前队列的状态,参数产科 AQS的参数详情

     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
   //实现runable接口,执行runWorker方法
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    // 检查排他模式下是否被占用,0表示未上锁,1表示上锁。
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    //尝试加锁
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    
    //尝试释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    //执行中断
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}`

线程池的建立

ThreadPoolExecutor t = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());

参数是:java

  • corePoolSize:核心线程池的大小,
  • runableTaskQueue:任务队列:用于保存等待执行的任务的阻塞队列
  • maximumPoolSize:线程池最大数量,线程池容许建立的最大线程数。
  • ThreadFactory:用于设置建立线程的工程,能够经过线程工厂给每一个建立出来的线程设置更有意义的名字。
  • RejectExecutionHandler(饱和策略),当队列和线程池都满了,说明线程池处于饱和状态,那么必须采起一种策略处理提交的新任务。
  • KeepAliveTime(线程活动保持时间):线程池的工做线程空闲后,保持存活的时间。
  • TimeUtil (线程活动保持时间的单位)

向线程池提交任务

public class MyRunnable1 implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" " + System.currentTimeMillis());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" end");
    }
}

public class ExecutorTest {
    public static void main(String[] args) throws InterruptedException {
        MyRunnable1 myRunnable1 = new MyRunnable1();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
        pool.execute(myRunnable1);
        pool.execute(myRunnable1);
        pool.execute(myRunnable1);
        pool.execute(myRunnable1);
        Thread.sleep(1000);
        pool.shutdown();
        pool.execute(myRunnable1);
        System.out.println("main end!");
    }
}

execute( ) 方法用于提交不须要返回值的任务,因此没法判断任务是否被线程池执行成功。 submit()方法用于提交须要返回值的任务。线程池会返回一个future类型的对象,经过这个future对象能够判断任务是否执行成功,而且能够经过future的get()方法来获取返回值。ide

线程池的关闭

使用线程池的 shutdown 或shutdownNow 方法来关闭线程池。oop

线程池的监控

  • taskcount:线程池须要执行的任务数量。
  • completedTaskCount线程池在运行过程当中已完成的任务数量,小于或等于taskcount
  • largestPoolSize:线程池里曾经建立过的最大线程数量。
  • getPoolSize:线程池的线程数量。
  • getActiveCount:获取活动的线程数。
相关文章
相关标签/搜索