背景
同事在工作过程中遇到了一个ScheduledThreadPoolExecutor 相关的Bug,在组里抛出了问题:当计划任务线程池中的一个周期任务抛出了异常的时候,线程池会有什么样的行为? 是结束本次执行而在下次时继续调度? 还是说此后不再调度这个任务?
大家一直在尝试在大模型口中直接得到答案, 但是我还是想和AI一起看一下源码了解一下底层的实现,于是就有了这篇笔记~
前置知识
ScheduledThreadPoolExecutor 本身也是一个ThreadPoolExecutor,所以我们要介绍一下ThreadPoolExecutor的基本原理:起几个(coreSize个)Worker线程,不停地去调用workQueue的poll或take方法[1]获取task并执行。
show me the code
我们直接从平时使用的入口开始—— scheduleAtFixedRate 方法;
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0L)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period),
                                          sequencer.getAndIncrement());
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }首先对参数进行了一些校验,然后便使用用户传入的runnable和计划信息 new出来了一个sft: ScheduledFutureTask;想必这个Future就是后面进行周期调度的基本单元;
然后将sft包装成t之后传入了delayedExecute方法,所以如果想深挖执行细节那么我们需要继续了解的就是这个delayedExecute方法,见后文。
可以看到紧接着就使用decorateTask方法将sft包装成了他的父类 RunnableScheduledFuture; 在默认ScheduledThreadPoolExecutor下这个包装器方法并没有任何动作,直接返回了task; 在注释中有提到,这个方法是一个扩展点,通过override这里,我们可以实现诸如以下类型的功能:
- 添加任务的额外功能(如日志记录、监控)。
- 替换任务的实现类。
- 包装任务以实现特定的功能(如统计执行时间、异常处理等)。
设计技巧+1
    /**
     * Main execution method for delayed or periodic tasks.  If pool
     * is shut down, rejects the task. Otherwise adds task to queue
     * and starts a thread, if necessary, to run it.  (We cannot
     * prestart the thread to run the task because the task (probably)
     * shouldn't be run yet.)  If the pool is shut down while the task
     * is being added, cancel and remove it if required by state and
     * run-after-shutdown parameters.
     *
     * @param task the task
     */
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(task) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }很好,在注释中就提到了这个方法是执行周期任务的主要入口,而且核心逻辑基本只有一行:将task添加到super的workQueue里; 在ScheduledThreadPoolExecutor的构造函数中可以看到将workQueue设置成了一个内部类:DelayedWorkQueue,特殊的blockingQueue。所以在这里结合前置知识中线程池的基本原理,我们就知道work会周期的调用DelayedWorkQueue的poll和take方法,一旦获得task就会执行; 所以下面关注一下这两个方法。
 public RunnableScheduledFuture<?> poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                RunnableScheduledFuture<?> first = queue[0];
                return (first == null || first.getDelay(NANOSECONDS) > 0)
                    ? null
                    : finishPoll(first);
            } finally {
                lock.unlock();
            }
        }take方法与这个差别不大,所以我们以简单的poll为例; 可以看到queue本身并不会记录每个任务的delay信息,delay信息是内聚在task中的,queue的poll动作只是查看最近的一个任务的delay是否已经<0;若是则代表有任务需要执行,返回任务信息;若否则营造出一种待执行列表为空的假象,Worker线程继续空转。
所以既然delay信息是内聚在task中,那么task在run完成之后想必要重新设置delay信息; 只要我们接下来查看ScheduledFuture的run方法中对于delay的设置即可。
        public void run() {
            if (!canRunInCurrentRunState(this))
                cancel(false);
            else if (!isPeriodic())
                super.run();
            else if (super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }很好,依然很简短; 这里首先判断任务是不是周期性的,在我们问题的背景下task应该是周期性的,所以会调用 runAndReset(), 这是一个父类FeatureTask类中的方法,在task正常执行且无throwable抛出的情况下会返回true,其他情况返回false。
而设置下次执行时间的方法 setNextRunTime() 需要本次任务执行成功才行,所以至此我们找到了问题的答案: 如果出现异常则不会进行后续调度。
后记
后来同事们发现在ScheduledThreadPoolExecutor 父接口的scheduleAtFixedRate方法注释上有这么一段(zulu java8)
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 period, and so on. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.
创建并执行一个周期性的动作,在初始延迟完成后会按照指定的周期进行调度;也就是说任务会在initialDelay执行,然后initialDelay+period, 然后initialDealy+2period... 如果任意一次执行抛出了异常,后续的调度会被抛弃。 另一方面,任务只会在被取消或整个exctor销毁后停止调度。 如果一个任务的执行时间大于period,下一次调度会被推迟而不是并发执行。
但是我十分疑惑为什么要在接口中定义实现类的具体逻辑…… 大模型告诉我说这是为了让用户不去实现类中查看具体细节,也是一种基于接口的契约…… 我并不很服气,因为如果我就是需要一个出现了异常也继续调度的executor,通过实现这个接口而实现一个实现类之后,实现类的行为就会与接口的注释出现冲突,反而增加了大家的理解成本不是么。



