type
status
date
slug
summary
tags
category
icon
password

几种预定义的线程池

  • newFixedThreadPool
    • public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
  • newSingleThreadExecutor
    • public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
  • newCachedThreadPool
    • public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
      这里使用的SynchronousQueue并不存储任务,因此这个线程池如果来了任务直接交给线程去执行,如果没有空闲的线程就创建,否则复用.

submit vs execute

有两种提交任务的方式:
  • execute(Runnable r)
    • 不返回结果
  • submit(Runnbale r)
    • 有一个Future的返回结果.
      public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
      可以看到,使用submit()提交Runnable,会使用一个RUnnableFuture包装Runnable,以返回任务结果.

ThreadPoolExecutor中线程池的几种状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
翻译成二进制即:
CAPACITY: 00011111111111111111111111111111 RUNNING: 11100000000000000000000000000000 SHUTDOWN: 00000000000000000000000000000000 STOP: 00100000000000000000000000000000 TIDYING: 01000000000000000000000000000000 TERMINATED:01100000000000000000000000000000
线程池的控制状态存放在一个AtomicInteger中,一共32位:
  • 高3位表示5种运行状态,即runState
    • 为什么用3位表示?其实这里就是个简单的状态机而已,2位最多表示4种状态,3位最多表示8种状态,因此这里选择3位来表示.
  • 低29位表示有效线程数,即workerCount

线程池的5种状态

  • Running
    • 可以接受新任务和处理队列中的任务
  • Shutdown
    • 不接受新任务,但是可以处理队列中的任务
  • Stop
    • 不接受新任务,也不处理队列中的任务,并且中断正在处理的任务
  • Tidying
    • 所有的任务已经终止了.workerCount = 0,过渡到这个状态的线程将会调用terminated()方法
  • Terminated
    • terminated()已经执行完毕
notion image
再来看一下这些位操作:
// 打包和拆包,ctlOf是打包,runStateOf是获取runState,workerCountOf是获取workerCount private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } //5种状态,只有Running是小于0的 private static boolean isRunning(int c) { return c < SHUTDOWN; }

Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker本身也是Runnable,run的时候实际上运行的也是任务,而且Worker持有了Thread和Runnable,并且为线程增加了中断控制的能力.
从代码上可以看出,Worker是复用的,不同的任务(Runnable)可能会添加到同一个Worker中,换句话说一个Worker对应一个线程,一个线程可能会运行多个任务.

execute(Runnable r)执行过程

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 线程状态,多线程环境的复杂性导致丑陋的代码 int c = ctl.get(); //线程数小于corePoolSize if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 线程池处于runnable状态,并且任务入队成功 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再检查下 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } private boolean addWorker(Runnable firstTask, boolean core) { retry: // 一些没法添加任务的场景 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 不是Running状态 && (不处于Shutdown状态 or 队列为空) if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 线程数达到极限,或者核心模式下,线程数大于等于corePoolSize,或者非核心模式下,线程数大于等于maxiumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 添加worker int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // worker添加之后,启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
基本上都是一些状态判断,然后试图添加worker.
状态何时会启动线程,执行任务.

总结

看看注释说明得了,把机制弄清楚.
代码写的真的是一言难尽.if else太多,各种稀奇古怪的判断看起来真的没什么意思.
在回顾了多年来的编程学习记录之后…Java反射
姜康
姜康
一个软件工程师
公告
type
status
date
slug
summary
tags
category
icon
password
🎉博客网站重新制作了🎉
👏欢迎更新体验👏