ThreadPoolExecutor


ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize/maxiumPoolSize

  • 运行的线程数 < corePoolSize

    创建新线程,不管其他线程是否处于Idle状态

  • corePoolSize < 运行的线程数 < maxiumPoolSize

    只有当队列满的时候,才会创建新线程

  • corePoolSize = maxiumPoolSize

    固定大小的线程池

  • maxiumPoolSize = Integer.MAX_VALUE

    容纳任意数量的并发任务

keepAliveTime/unit

如果线程池当前线程数超过corePoolSize,多余的线程如果在IDLE状态下超过keepAliveTime就会被终止.

使用Long.MAX_VALUE TimeUnit#NANOSECONDS作为keepAliveTime,可以阻止空闲线程在关闭前被终止.

默认情况下,keepAliveTime策略应用在线程数超过corePoolSize的情况.但是使用allowCoreThreadTimeOut(boolean)方法,只要keepAliveTime大于0,也可以应用到核心线程.

workQueue

任何BlockingQueue的实现都可以用来传递和持有提交的任务.

  • 运行线程数 < corePoolSize时

    Executor倾向于创建新线程而不是将任务入队

  • 运行线程数 >= corePoolSize时

    Executor倾向于将任务入队,而不是创建新线程

  • 任务请求不能入队(比如队列已满)

    如果运行线程数不超过maxiumPoolSize,则创建新线程,否则,被拒绝

常见的三种队列对应三种入队策略:

  • 直接转移,队列不缓存任务

    这里可以使用SynchronousQueue,这个队列是没有内部缓存的,也就是说来一个任务马上就会转移出去,可能是复用线程,创建线程,或者直接被拒绝.通常为了防止任务频繁被拒绝,可以将maxiumPoolSize设置为无限大,即Integer.MAX_VALUE.

    Executors#newCachedThreadPool()就属于这种策略.

  • 无界队列

    比如使用LinkedBlockingQueue,当corePoolSize个线程都在忙时,新任务只能在队列里等待.因此,线程数不会超过corePoolSize,设置的maxiumPoolSize没有任何作用.适合于那种任务之间没有任何依赖的场景.不过当任务处理时间比较久时,会导致队列增长过快.

  • 有界队列

    比如使用ArrayBlockingQueue,当设置了有限的maxiumPoolSize时可以防止系统资源被耗尽,但是相对更难控制.往往都需要在队列大小与最大线程数之间进行折衷.

    如果使用了大的队列,少的线程数可以节约CPU,系统资源,和线程上下文切换的开销,但是会导致任务吞吐量变低,如果任务频繁的被阻塞,系统有时候也会多调度一些进程(会超过你设置的限制);

    如果使用了小队列,多线程数,会使CPU更加忙碌,但是可能导致过载,反而降低吞吐量.

我们可以使用getQueue()方法获取当前队列,不过只建议在debug和监控的时候使用这个方法.

如果想取消排队任务,可以使用remove(Runnable r)或者purge()方法.

handler

RejectedExecutionHandler,当Executor关闭时,或者使用有界队列,并且线程数达到了maxiumPoolSize,新任务会被拒绝.

目前有4种预定义的拒绝策略:

  • 直接抛异常

       public static class AbortPolicy implements RejectedExecutionHandler {
           public AbortPolicy() { }
           public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
               throw new RejectedExecutionException("Task " + r.toString() +
                                                    " rejected from " +
                                                    e.toString());
           }
       }
  • 如果线程池没有关闭,直接调用Runnable的run方法继续执行(在调用者线程)

        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            public CallerRunsPolicy() { }
    
            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
  • 直接丢弃任务,不执行

        public static class DiscardPolicy implements RejectedExecutionHandler {
            public DiscardPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
  • 丢掉队列中最久的任务,添加新任务

        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            public DiscardOldestPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }

    在实际应用过程中,我们一般都会自定义拒绝策略,做一些特殊处理.

Hook方法

ThreadPoolExecutor中有两个protected的方法:

  • beforeExecute(Thread t, Runnable r)
  • afterExecute(Thread t, Runnable r)

这两个方法分别在任务执行前后被调用.


文章作者: 姜康
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 姜康 !
评论
  目录