spring定时任务Scheduled注解源码分析

2+

我们都知道再spring 中使用定时任务可以直接在要执行定时任务的方法上面加注解@Scheduled(cron=”0/1 * * * * ?”)。但是为什么只需这简单的一个注解就能执行定时任务,我们来看源码一点点分析。在项目中必须还加@EnableScheduling才能真正的启动定时任务,也就是去注册执行定时任务。

我们来看@EnableScheduling的定义:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

可以看到实际上是通过@Import注解加载了另外的配置属性类(SchedulingConfiguration.class):

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

  @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
    return new ScheduledAnnotationBeanPostProcessor();
  }

}

里面定义了ScheduledAnnotationBeanPostProcessor这个bean。就会在context初始化时候,查找我们代码中的@Scheduled,并把它们转换为定时任务。

可以看到主要就是去创建了ScheduledAnnotationBeanPostProcessor();

ScheduledAnnotationBeanPostProcessor.java源码分析:

 

//spring的容器有多个,有的容器是不会调用这个setApplicationContext的
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
  this.applicationContext = applicationContext;
  if (this.beanFactory == null) {
    this.beanFactory = applicationContext;
  }
}

Spring在完成这个Bean的初始化后,ScheduledAnnotationBeanPostProcessor类实现了ApplicationContextAware接口,所以直接调用了setApplicationContext方法将ApplicationContext上下文对象注入至该Bean对象(@EnableScheduling)中。

然后在每个bean初始化之后就会去调用postProcessAfterInitialization方法(观察者模式)去会查找这个bean中任何带有@Scheduled的方法。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
  if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
      bean instanceof ScheduledExecutorService) {
    // Ignore AOP infrastructure such as scoped proxies.
    return bean;
  }

  Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
  if (!this.nonAnnotatedClasses.contains(targetClass) &&
      AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
    Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
        (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
          Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
              method, Scheduled.class, Schedules.class);
          return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
        });
    if (annotatedMethods.isEmpty()) {
      this.nonAnnotatedClasses.add(targetClass);
      if (logger.isTraceEnabled()) {
        logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
      }
    }
    else {
      // Non-empty set of methods
      annotatedMethods.forEach((method, scheduledMethods) ->
          scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
      if (logger.isTraceEnabled()) {
        logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
            "': " + annotatedMethods);
      }
    }
  }
  return bean;
}

找到带有@Scheduled的方法这个方法之后调用了processScheduled方法。

 

/**
 * Process the given {@code @Scheduled} method declaration on the given bean.
 * @param scheduled the @Scheduled annotation
 * @param method the method that the annotation has been declared on
 * @param bean the target bean instance
 * @see #createRunnable(Object, Method)
 */
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
  try {
    Runnable runnable = createRunnable(bean, method);
    boolean processedSchedule = false;
    String errorMessage =
        "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

    Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

    // Determine initial delay 确定初始延时
    long initialDelay = scheduled.initialDelay();
    String initialDelayString = scheduled.initialDelayString();
    if (StringUtils.hasText(initialDelayString)) {
      Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
      if (this.embeddedValueResolver != null) {
        initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
      }
      if (StringUtils.hasLength(initialDelayString)) {
        try {
          initialDelay = parseDelayAsLong(initialDelayString);
        }
        catch (RuntimeException ex) {
          throw new IllegalArgumentException(
              "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
        }
      }
    }

    // Check cron expression
    String cron = scheduled.cron();
    if (StringUtils.hasText(cron)) {
      String zone = scheduled.zone();
      if (this.embeddedValueResolver != null) {
        cron = this.embeddedValueResolver.resolveStringValue(cron);
        zone = this.embeddedValueResolver.resolveStringValue(zone);
      }
      if (StringUtils.hasLength(cron)) {
        Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
        processedSchedule = true;
        if (!Scheduled.CRON_DISABLED.equals(cron)) {
          TimeZone timeZone;
          if (StringUtils.hasText(zone)) {
            timeZone = StringUtils.parseTimeZoneString(zone);
          }
          else {
            timeZone = TimeZone.getDefault();
          }
          tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
        }
      }
    }

    // At this point we don't need to differentiate between initial delay set or not anymore
    if (initialDelay < 0) {
      initialDelay = 0;
    }

    // Check fixed delay 类型是否为fixedDelay 
    long fixedDelay = scheduled.fixedDelay();
    if (fixedDelay >= 0) {
      Assert.isTrue(!processedSchedule, errorMessage);
      processedSchedule = true;
      tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
    }
    String fixedDelayString = scheduled.fixedDelayString();
    if (StringUtils.hasText(fixedDelayString)) {
      if (this.embeddedValueResolver != null) {
        fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
      }
      if (StringUtils.hasLength(fixedDelayString)) {
        Assert.isTrue(!processedSchedule, errorMessage);
        processedSchedule = true;
        try {
          fixedDelay = parseDelayAsLong(fixedDelayString);
        }
        catch (RuntimeException ex) {
          throw new IllegalArgumentException(
              "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
        }
        tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
      }
    }

    // Check fixed rate
    long fixedRate = scheduled.fixedRate();
    if (fixedRate >= 0) {
      Assert.isTrue(!processedSchedule, errorMessage);
      processedSchedule = true;
      tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
    }
    String fixedRateString = scheduled.fixedRateString();
    if (StringUtils.hasText(fixedRateString)) {
      if (this.embeddedValueResolver != null) {
        fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
      }
      if (StringUtils.hasLength(fixedRateString)) {
        Assert.isTrue(!processedSchedule, errorMessage);
        processedSchedule = true;
        try {
          fixedRate = parseDelayAsLong(fixedRateString);
        }
        catch (RuntimeException ex) {
          throw new IllegalArgumentException(
              "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
        }
        tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
      }
    }

    // Check whether we had any attribute set
    Assert.isTrue(processedSchedule, errorMessage);

    // Finally register the scheduled tasks
    synchronized (this.scheduledTasks) {
      Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
      regTasks.addAll(tasks);
    }
  }
  catch (IllegalArgumentException ex) {
    throw new IllegalStateException(
        "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
  }
}

processScheduled这个方法实际上就是去注册任务并去触发执行任务(实际没有触发执行)。

Runnable runnable = createRunnable(bean, method);

这句话就是会去用定时调用目标bean实例和执行方法去生成一个线程实例,看到这里,实际上就已经知道了定时任务执行的时候最终执行的就是这个线程,spring的@Scheduled底层是用线程来实现的。我们去看一下createRunnable这个方法,首先会检验@Scheduled注解的方法的访问权限,然后再去动态代理去执行该方法:

protected Runnable createRunnable(Object target, Method method) {
  Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
 //这句就是为了检验方法的访问权限不是private
  Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
  return new ScheduledMethodRunnable(target, invocableMethod);
}

public class ScheduledMethodRunnable implements Runnable {

   private final Object target;

   private final Method method;

   public ScheduledMethodRunnable(Object target, Method method) {
      this.target = target;
      this.method = method;
   }

   public ScheduledMethodRunnable(Object target, String methodName) throws NoSuchMethodException {
      this.target = target;
      this.method = target.getClass().getMethod(methodName);
   }

   public Object getTarget() {
      return this.target;
   }

   /**
    * Return the target method to call.
    */
   public Method getMethod() {
      return this.method;
   }

   @Override
   public void run() {
      try {
         ReflectionUtils.makeAccessible(this.method);
         //动态代理去执行
         this.method.invoke(this.target);
      }
      catch (InvocationTargetException ex) {
         ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
      }
      catch (IllegalAccessException ex) {
         throw new UndeclaredThrowableException(ex);
      }
   }

   @Override
   public String toString() {
      return this.method.getDeclaringClass().getName() + "." + this.method.getName();
   }

}

生成runable之后,就会判断三种类型@Scheduled注解的三个属性fixedRate(fixedRateString), fixedDelay(fixedDelayString), 以及 cron。我们最常用的就是cron表达式。另外两个的解释说明:

1、fixedDelay 固定延迟时间,这个周期是以上一个调用任务的完成时间为基准,在上一个任务完成之后,5s后再次执行。

2、fixedRate 固定速率执行执行,是从上一次方法执行开始的时间算起,如果上一次方法阻塞住了,下一次也是不会执行,但是在阻塞这段时间内累计应该执行的次数,当不再阻塞时,一下子把这些全部执行掉,而后再按照固定速率继续执行。这个周期是以上一个任务开始时间为基准,从上一任务开始执行后5s再次调用。

 

我们就拿cron的来说,其他两个类型的调用方法差不多,只是类型不同。

tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));

 

CronTrigger.class

//Cron表达式来生成调度计划,Spring对cron表达式的支持,是由CronSequenceGenerator实现的
public class CronTrigger implements Trigger {

  private final CronSequenceGenerator sequenceGenerator;

  public CronTrigger(String expression) {
    this.sequenceGenerator = new CronSequenceGenerator(expression);
  }

  public CronTrigger(String expression, TimeZone timeZone) {
    this.sequenceGenerator = new CronSequenceGenerator(expression, timeZone);
  }

  public String getExpression() {
    return this.sequenceGenerator.getExpression();
  }
  //获得下一次任务的执行时间
  @Override
  public Date nextExecutionTime(TriggerContext triggerContext) {
    Date date = triggerContext.lastCompletionTime();
    if (date != null) {
      Date scheduled = triggerContext.lastScheduledExecutionTime();
      if (scheduled != null && date.before(scheduled)) {
        // Previous task apparently executed too early...
        // Let's simply use the last calculated execution time then,
        // in order to prevent accidental re-fires in the same second.
        date = scheduled;
      }
    }
    else {
      date = new Date();
    }
    return this.sequenceGenerator.next(date);
  }


  @Override
  public boolean equals(@Nullable Object other) {
    return (this == other || (other instanceof CronTrigger &&
        this.sequenceGenerator.equals(((CronTrigger) other).sequenceGenerator)));
  }

  @Override
  public int hashCode() {
    return this.sequenceGenerator.hashCode();
  }

  @Override
  public String toString() {
    return this.sequenceGenerator.toString();
  }

}

 

ScheduledTaskRegistrar.class(部分关键代码)
public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {

   public static final String CRON_DISABLED = "-";

   //负责执行任务
   //Spring任务调度器的核心接口,定义了执行定时任务的主要方法,主要根据任务的不同触发方式调用不同的执行逻辑,其实现类都是对JDK原生的定时器或线程池组件进行包装,并扩展额外的功能。
   @Nullable
   private TaskScheduler taskScheduler;  
   //没有设置taskScheduler就会使用Executors.newSingleThreadScheduledExecutor()
   @Nullable
   private ScheduledExecutorService localExecutor;
   //CronTask类型的任务集合
   @Nullable
   private List<TriggerTask> triggerTasks;
   //FixedRateTask类型的任务集合
   @Nullable
   private List<CronTask> cronTasks;
   //FixedRateTask类型的任务集合
   @Nullable
   private List<IntervalTask> fixedRateTasks;
   //FixedDelayTask类型的任务集合
   @Nullable
   private List<IntervalTask> fixedDelayTasks;
   //还未真正执行的任务
   private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<>(16);
   //已经启动执行的任务
   private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<>(16);


   public void addCronTask(Runnable task, String expression) {
      if (!CRON_DISABLED.equals(expression)) {
         addCronTask(new CronTask(task, expression));
      }
   }

   public void addCronTask(CronTask task) {
      if (this.cronTasks == null) {
         this.cronTasks = new ArrayList<>();
      }
      this.cronTasks.add(task);
   }


   @Override
   public void afterPropertiesSet() {
      scheduleTasks();
   }
   //spring容器初始化完成最终触发所有定时任务的方法
   @SuppressWarnings("deprecation")
   protected void scheduleTasks() {
      //这时候如果taskScheduler在之前没有初始化赋值成功就会在这里再次进行赋值,保证定时任务能够正常执行
      if (this.taskScheduler == null) {
         this.localExecutor = Executors.newSingleThreadScheduledExecutor();
         this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
      }
      if (this.triggerTasks != null) {
         for (TriggerTask task : this.triggerTasks) {
            addScheduledTask(scheduleTriggerTask(task));
         }
      }
      if (this.cronTasks != null) {
         for (CronTask task : this.cronTasks) {
            addScheduledTask(scheduleCronTask(task));
         }
      }
      if (this.fixedRateTasks != null) {
         for (IntervalTask task : this.fixedRateTasks) {
            addScheduledTask(scheduleFixedRateTask(task));
         }
      }
      if (this.fixedDelayTasks != null) {
         for (IntervalTask task : this.fixedDelayTasks) {
            addScheduledTask(scheduleFixedDelayTask(task));
         }
      }
   }

   private void addScheduledTask(@Nullable ScheduledTask task) {
      if (task != null) {
         this.scheduledTasks.add(task);
      }
   }

   @Nullable
   public ScheduledTask scheduleCronTask(CronTask task) {
      ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
      boolean newTask = false;
      if (scheduledTask == null) {
         scheduledTask = new ScheduledTask(task);
         newTask = true;
      }
      if (this.taskScheduler != null) {
        //真正触发执行了定时任务
         scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
      }
      else {
         addCronTask(task);
         this.unresolvedTasks.put(task, scheduledTask);
      }
      return (newTask ? scheduledTask : null);
   }

   @Override
   public Set<ScheduledTask> getScheduledTasks() {
      return Collections.unmodifiableSet(this.scheduledTasks);
   }

   @Override
   public void destroy() {
      for (ScheduledTask task : this.scheduledTasks) {
         task.cancel();
      }
      if (this.localExecutor != null) {
         this.localExecutor.shutdownNow();
      }
   }

}

scheduleCronTask方法就是要执行的方法。其中又把task封装为了ScheduledTask。其实ScheduledTask这个类里面就是把task(任务信息)、ScheduledFuture(进行任务得一些操作)放了进去,ScheduledFuture操作:

  • V get(): 获取结果,若无结果会阻塞至异步计算完成
  • V get(long timeOut, TimeUnit unit):获取结果,超时返回null
  • boolean isDone():执行结束(完成/取消/异常)返回true
  • boolean isCancelled():任务完成前被取消返回true
  • boolean cancel(boolean mayInterruptRunning):取消任务,未开始或已完成返回false,参数表示是否中断执行中的线程

当创建了Future实例,任务可能有以下三种状态:

  • 等待状态。此时调用cancel()方法不管传入true还是false都会标记为取消,任务依然保存在任务队列中,但当轮到此任务运行时会直接跳过。
  • 完成状态。此时cancel()不会起任何作用,因为任务已经完成了。
  • 运行中。此时传入true会中断正在执行的任务,传入false则不会中断。

我们在源码中可以看到在真正执行定时任务的时候这里面会判this.taskScheduler 是否为空,如果为空的话就不去执行定时任务。那此时的这个this.taskScheduler其实是空的,在这之前根据走过来的源码还没有给这个ScheduledTaskRegistrar类中的taskScheduler赋值的,所以就只是把这个创建好的定时任务先放进了unresolvedTasks(还未真正执行的任务)集合中,并没有真正的去触发。那到底什么时候才会真正的去触发了呢,我们接着一步步走看一下。接着就再返回到了原来的ScheduledAnnotationBeanPostProcessor类的processScheduled方法。

 

//储存任务
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

    ......省略
    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));

    ......省略
    synchronized (this.scheduledTasks) {
      Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
      regTasks.addAll(tasks);
    }
    ......省略
}

可以看到把返回的ScheduledTask方进了Set集合中。然后最终就是把任务放进了scheduledTasks这个map中记录用于其他地方使用。在创建bean的最后一步就会执行bean的afterSingletonsInstantiated方法,所以定时任务在前面都没有真正的去触发,在这最后一步的操作中,才是真正去触发执行了定时任务。

//创建bean的最后一步就会执行bean的afterSingletonsInstantiated方法
@Override
public void afterSingletonsInstantiated() {
  this.nonAnnotatedClasses.clear();
  if (this.applicationContext == null) {
    finishRegistration();
  }
}

//Spring容器初始化完成后执行
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
   if (event.getApplicationContext() == this.applicationContext) {
      finishRegistration();
   }
}

private void finishRegistration() {
   if (this.scheduler != null) {
      this.registrar.setScheduler(this.scheduler);
   }

   if (this.beanFactory instanceof ListableBeanFactory) {
      Map<String, SchedulingConfigurer> beans =
            ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
      List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
      AnnotationAwareOrderComparator.sort(configurers);
      for (SchedulingConfigurer configurer : configurers) {
         configurer.configureTasks(this.registrar);
      }
   }

   if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
      Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
      try {
         //给ScheduledTaskRegistrar中的TaskScheduler赋值,只有给TaskScheduler赋值待会真正的触发任务
         this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
      }
      catch (NoUniqueBeanDefinitionException ex) {
         logger.trace("Could not find unique TaskScheduler bean", ex);
         try {
            this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
         }
         catch (NoSuchBeanDefinitionException ex2) {
            if (logger.isInfoEnabled()) {
               logger.info("More than one TaskScheduler bean exists within the context, and " +
                     "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                     "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                     "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                     ex.getBeanNamesFound());
            }
         }
      }
      catch (NoSuchBeanDefinitionException ex) {
         logger.trace("Could not find default TaskScheduler bean", ex);
         // Search for ScheduledExecutorService bean next...
         try {
            this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
         }
         catch (NoUniqueBeanDefinitionException ex2) {
            logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
            try {
               this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
            }
            catch (NoSuchBeanDefinitionException ex3) {
               if (logger.isInfoEnabled()) {
                  logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                        "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                        "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                        "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                        ex2.getBeanNamesFound());
               }
            }
         }
         catch (NoSuchBeanDefinitionException ex2) {
            logger.trace("Could not find default ScheduledExecutorService bean", ex2);
            // Giving up -> falling back to default scheduler within the registrar...
            logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
         }
      }
   }
   //真正去触发执行定时任务,最终还是又调用了ScheduledTaskRegistrar中的scheduleCronTask方法
   this.registrar.afterPropertiesSet();
}

我们可以看到不管是afterSingletonsInstantiated还是onApplicationEvent最终都调用了finishRegistration方法,而在这个方法中才去把ScheduledTaskRegistrar类中的taskScheduler赋值。

赋值之后才去真正去触发执行定时任务,并且最终还是又调用了ScheduledTaskRegistrar中的scheduleCronTask方法。因为这次调用的时候taskScheduler已经被赋过值了,就会真正执行触发了定时任务(TaskScheduler.schedule)。

ScheduledTaskRegistrar.java

public ScheduledTask scheduleCronTask(CronTask task) { 

    *******省略*******

    if (this.taskScheduler != null) {
       scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
    }

    *******省略*******
}

 

接下来就是要深入到TaskScheduler这个定时任务实际执行的线程中里面的源码底层是怎么进行执行这个定时任务的。

TaskScheduler
public interface TaskScheduler {

  //调度执行具体任务,定时任务触发时调用它
  ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

  default ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
    return schedule(task, Date.from(startTime));
  }

  ScheduledFuture<?> schedule(Runnable task, Date startTime);

  default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
    return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis());
  }

  ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);

  default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
    return scheduleAtFixedRate(task, period.toMillis());
  }

  ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);

  default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
    return scheduleWithFixedDelay(task, Date.from(startTime), delay.toMillis());
  }

  ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);

  default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
    return scheduleWithFixedDelay(task, delay.toMillis());
  }

  ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);

}

 

public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
    implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {

  private volatile int poolSize = 1;

  private volatile boolean removeOnCancelPolicy = false;

  @Nullable
  private volatile ErrorHandler errorHandler;
        

  //延迟和定期执行任务的线程池。
  @Nullable
  private ScheduledExecutorService scheduledExecutor;

  // Underlying ScheduledFutureTask to user-level ListenableFuture handle, if any
  private final Map<Object, ListenableFuture<?>> listenableFutureMap =
      new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
        
        ****省略****
          
        //初始化线程池
        protected ExecutorService initializeExecutor(
      ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

    this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);

    if (this.removeOnCancelPolicy) {
      if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
        ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
      }
      else {
        logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor");
      }
    }

    return this.scheduledExecutor;
  }

        //创建一个线程池
  protected ScheduledExecutorService createExecutor(
      int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

    return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
  }

        //返回线程池对象
        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() throws IllegalStateException {
    Assert.state(this.scheduledExecutor instanceof ScheduledThreadPoolExecutor,
        "No ScheduledThreadPoolExecutor available");
    return (ScheduledThreadPoolExecutor) this.scheduledExecutor;
  }

  public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
    ScheduledExecutorService executor = getScheduledExecutor();
    try {
      ErrorHandler errorHandler = this.errorHandler;
      if (errorHandler == null) {
        errorHandler = TaskUtils.getDefaultErrorHandler(true);
      }
      //调用ReschedulingRunnable实际执行定时任务
      return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
    }
    catch (RejectedExecutionException ex) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
  }

        ****省略****
}

 

可以看到在ThreadPoolTaskScheduler 里面主要就是创建了ScheduledExecutorService对象并把它传递给下一步操作,而这个就是要执行定时任务的线程池,ScheduledExecutorService在ExecutorService提供的功能之上再增加了延迟和定期执行任务的功能。通过查看源码最终创建的这个线程池关键属性为:

corePoolSize:1
maximumPoolSize:Integer.MAX_VALUE
queueSize:16(初始值)

这个任务队列queue使用的为专门封装的DelayedWorkQueue:

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
        
        ****省略****
        /**
         * 动态扩容队列(会在添加元素的时候offer方法中调用这个方法)
         */
        private void grow() {
            int oldCapacity = queue.length;
            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
            if (newCapacity < 0) // overflow
                newCapacity = Integer.MAX_VALUE;
            queue = Arrays.copyOf(queue, newCapacity);
        }
        ****省略****
        
}

 

ThreadPoolTaskScheduler 接口中的方法和上面TaskScheduler 的接口方法一样,不过ThreadPoolTaskScheduler 接口是JDK中具体实现的,是在java.util.concurrent包中,而且java没有提供触发器可重复触发的任务,只是提供了可以按照延迟时间执行一次任务。所以spring去封装了ReschedulingRunnable来实际的去不断调用java.util.concurrent包中ThreadPoolTaskScheduler 接口去触发定时任务。

 

下面我们来看一下spring封装的ReschedulingRunnable类

//spring定时任务封装执行类
class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {

  private final Trigger trigger;
  
  /*简单定时任务数据保存接口(存有定时任务的三个时间变量)
   * lastScheduledExecutionTime 上一次任务的执行时间(调度时间)
   * lastActualExecutionTime 上一次任务的实际执行开始时间
   * lastCompletionTime 上一次任务的实际执行完成时间
   */
  private final SimpleTriggerContext triggerContext = new SimpleTriggerContext();
  
  //实际执行任务的线程池     
  private final ScheduledExecutorService executor;

  @Nullable
  private ScheduledFuture<?> currentFuture;

  @Nullable
  private Date scheduledExecutionTime;

  private final Object triggerContextMonitor = new Object();


  public ReschedulingRunnable(
      Runnable delegate, Trigger trigger, ScheduledExecutorService executor, ErrorHandler errorHandler) {

    super(delegate, errorHandler);
    this.trigger = trigger;
    this.executor = executor;
  }


  @Nullable
  public ScheduledFuture<?> schedule() {
    /* 这里加锁的原因就是虽然默认的线程池实际上poolSize永远为1,而且延迟队列在上面已经说过是无限扩容的
     * 所以刚开始给我的感觉就是:既然这样的话其实在这里加锁完全没有必要的,
     * 但是,其实我们在使用定时器可以自定义线程池去执行,所以这时候poolSize就可能不为1了,
     * 所以这时候必须要进行加锁来防止最终计算出的initialDelay数值错误
     */
    synchronized (this.triggerContextMonitor) {
      this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
      if (this.scheduledExecutionTime == null) {
        return null;
      }
      long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
      //这里交给执行器执行的Runnable对象是this,就是到达执行时间时就回去执行当前类中的run方法。
      this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
      return this;
    }
  }

  private ScheduledFuture<?> obtainCurrentFuture() {
    Assert.state(this.currentFuture != null, "No scheduled future");
    return this.currentFuture;
  }

  @Override
  public void run() {
    Date actualExecutionTime = new Date();
    //执行被@Scheduler注释的方法(代理对象)
    super.run();
    Date completionTime = new Date();
    synchronized (this.triggerContextMonitor) {
      Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
      this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
      if (!obtainCurrentFuture().isCancelled()) {
        //又去调用schedule()方法,这样利用递归就实现了任务的重复调用。直到任务被取消
        schedule();
      }
    }
  }


  @Override
  public boolean cancel(boolean mayInterruptIfRunning) {
    synchronized (this.triggerContextMonitor) {
      return obtainCurrentFuture().cancel(mayInterruptIfRunning);
    }
  }

  @Override
  public boolean isCancelled() {
    synchronized (this.triggerContextMonitor) {
      return obtainCurrentFuture().isCancelled();
    }
  }

  @Override
  public boolean isDone() {
    synchronized (this.triggerContextMonitor) {
      return obtainCurrentFuture().isDone();
    }
  }

  @Override
  public Object get() throws InterruptedException, ExecutionException {
    ScheduledFuture<?> curr;
    synchronized (this.triggerContextMonitor) {
      curr = obtainCurrentFuture();
    }
    return curr.get();
  }

  @Override
  public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    ScheduledFuture<?> curr;
    synchronized (this.triggerContextMonitor) {
      curr = obtainCurrentFuture();
    }
    return curr.get(timeout, unit);
  }

  @Override
  public long getDelay(TimeUnit unit) {
    ScheduledFuture<?> curr;
    synchronized (this.triggerContextMonitor) {
      curr = obtainCurrentFuture();
    }
    return curr.getDelay(unit);
  }

  @Override
  public int compareTo(Delayed other) {
    if (this == other) {
      return 0;
    }
    long diff = getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
    return (diff == 0 ? 0 : ((diff < 0)? -1 : 1));
  }

}

 

ReschedulingRunnable类其实就是spring定时任务的最终时间执行的封装类,利用递归对java的java.util.concurrent包中的ThreadPoolTaskScheduler的schedule方法调用,实现了定时器。去完善了java没有提供触发器可重复触发的任务,只是提供了可以按照延迟时间执行一次任务的更好实现,让我们方便使用。

 

看到这里在这里其实有一个要注意的地方,那个实际执行任务的线程池的TaskScheduler初始化的corePoolSize为1,并且queueSize为为不断扩容的无限大。所以在实际执行的时候,所有的@Scheduler注解最终都是使用的这个线程池,并且在ReschedulingRunnable 实际执行的调度方法中(schedule方法)。

@Nullable
public ScheduledFuture<?> schedule() {
  synchronized (this.triggerContextMonitor) {

    this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
    if (this.scheduledExecutionTime == null) {
       return null;
    }
    long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
    //executor就是TaskScheduler的
    this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
    ****省略****
  }
}

所以在实际的执行中,多个定时任务执行时是会出现定时任务执行时间不准确的问题的。只要有一个定时任务(就假设为A)在执行(阻塞状态),其他的定时任务必然会等到这个A任务执行完才能够执行的(而且其他任务会在阻塞的这段时间内应该会执行的任务会直接被丢弃,实际没有执行,因为阻塞完成后会计算下一次的运行时间,这时候之前的任务已经无法执行了)。

我写了一段测试代码可以证明这种情况:

@Component
@EnableScheduling
public class GeneralScheduled {

    private static final Logger logger = LoggerFactory.getLogger(GeneralScheduled.class);

    /**
     * 定时器  每1分钟执行一次
     *
     */
    @Scheduled(cron="0 0/1 * * * ? ")
    public void index1() throws Exception {
        get_Similarity();
        System.out.println("定时任务1-time:"+General.getTime());
    }
    /**
     * 定时器 每5秒执行一次
     *
     */
    @Scheduled(cron="*/5 * * * * ? ")
    public void index2() {
        System.out.println("定时任务2-time:"+General.getTime());
    }

    @Autowired
    RatingsService ratingsService;

    @Autowired
    RecommendBaseGood recommendBaseGood;

    @Autowired
    MoviesService moviesService;

    @Test
    public void get_Similarity() {
        General.stopwatchBegin();
        List<Ratings> ratingsList = ratingsService.getAll();
        General.stopwatchEnd();
        //耗费时间的操作
        List<Integer> goodsId = recommendBaseGood.Get_Similarity(ratingsService.getAll(), 1);
    }
}

我们原本希望的结果是每整点分钟执行一次index1(注意:定时器设置的cron只要是不能被60整除的,即便设置的是每50秒(或者分、小时)执行一次,也不会按照每50秒执行,一般都应设置为能被60整除的),每5秒执行一次index2.

我们写定时器是是想让控制台理应每整点一分钟输出:定时任务1-time,每间隔4秒输出:定时任务2-time。但是真实的情况不是这样的,因为上面说的原因,在任务A比较耗时的操作没有完成前,控制台也不会去输出:定时任务2-time。

控制台输出结果:

 

 

接下来就是java.util.concurrent包中的ThreadPoolTaskScheduler的schedule方法是怎么实现按照给定的延迟时间执行一次任务的。

 

未完待续。。。。。。

 

 

 

2+

发表评论

邮箱地址不会被公开。

一条评论 “spring定时任务Scheduled注解源码分析”