Java线程池源码分析


几种预定义的线程池

  • newFixedThreadPool

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
  • newSingleThreadExecutor

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
  • newCachedThreadPool

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    这里使用的SynchronousQueue并不存储任务,因此这个线程池如果来了任务直接交给线程去执行,如果没有空闲的线程就创建,否则复用.

submit vs execute

有两种提交任务的方式:

  • execute(Runnable r)

    不返回结果

  • submit(Runnbale r)

    有一个Future的返回结果.

        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    可以看到,使用submit()提交Runnable,会使用一个RUnnableFuture包装Runnable,以返回任务结果.

ThreadPoolExecutor中线程池的几种状态

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

翻译成二进制即:

CAPACITY:  00011111111111111111111111111111
RUNNING:   11100000000000000000000000000000
SHUTDOWN:  00000000000000000000000000000000
STOP:      00100000000000000000000000000000
TIDYING:   01000000000000000000000000000000
TERMINATED:01100000000000000000000000000000

线程池的控制状态存放在一个AtomicInteger中,一共32位:

  • 高3位表示5种运行状态,即runState

    为什么用3位表示?其实这里就是个简单的状态机而已,2位最多表示4种状态,3位最多表示8种状态,因此这里选择3位来表示.

  • 低29位表示有效线程数,即workerCount

线程池的5种状态

  • Running

    可以接受新任务和处理队列中的任务

  • Shutdown

    不接受新任务,但是可以处理队列中的任务

  • Stop

    不接受新任务,也不处理队列中的任务,并且中断正在处理的任务

  • Tidying

    所有的任务已经终止了.workerCount = 0,过渡到这个状态的线程将会调用terminated()方法

  • Terminated

    terminated()已经执行完毕

线程池状态转换

再来看一下这些位操作:

    // 打包和拆包,ctlOf是打包,runStateOf是获取runState,workerCountOf是获取workerCount
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    //5种状态,只有Running是小于0的
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

Worker

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker本身也是Runnable,run的时候实际上运行的也是任务,而且Worker持有了Thread和Runnable,并且为线程增加了中断控制的能力.

从代码上可以看出,Worker是复用的,不同的任务(Runnable)可能会添加到同一个Worker中,换句话说一个Worker对应一个线程,一个线程可能会运行多个任务.

execute(Runnable r)执行过程

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        // 线程状态,多线程环境的复杂性导致丑陋的代码
        int c = ctl.get();
        //线程数小于corePoolSize 
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 线程池处于runnable状态,并且任务入队成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再检查下
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 一些没法添加任务的场景
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 不是Running状态 && (不处于Shutdown状态 or 队列为空)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
             // 线程数达到极限,或者核心模式下,线程数大于等于corePoolSize,或者非核心模式下,线程数大于等于maxiumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // 添加worker
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // worker添加之后,启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

基本上都是一些状态判断,然后试图添加worker.

状态何时会启动线程,执行任务.

总结

看看注释说明得了,把机制弄清楚.

代码写的真的是一言难尽.if else太多,各种稀奇古怪的判断看起来真的没什么意思.


文章作者: 姜康
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 姜康 !
评论
  目录