模拟任务阻塞处理策略

模拟任务阻塞处理策略

官方描述

阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;

  1. 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
  2. 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
  3. 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

为了验证并且更加熟悉XXL-JOB这一块的功能,遂做了如下的实验:

实验

编写3个任务执行方法

  1. 模拟【阻塞处理策略】为 单机串行

    @XxlJob("serialExecutionJobHandler")
    public ReturnT<String> serialExecutionJobHandler(String param) throws Exception {
     XxlJobLogger.log("XXL-JOB, serialExecutionJobHandler.");
     // 休眠1分钟,模拟执行时间特别长
     TimeUnit.SECONDS.sleep(60);
     return ReturnT.SUCCESS;
    }
  2. 模拟【阻塞处理策略】为 丢弃后续调度

    @XxlJob("discardLaterJobHandler")
    public ReturnT<String> discardLaterJobHandler(String param) throws Exception {
     XxlJobLogger.log("XXL-JOB, discardLaterJobHandler.");
     // 休眠1分钟,模拟执行时间特别长
     TimeUnit.SECONDS.sleep(60);
     return ReturnT.SUCCESS;
    }
  3. 模拟【阻塞处理策略】为 覆盖之前调度

    @XxlJob("coverEarlyJobHandler")
    public ReturnT<String> coverEarlyJobHandler(String param) throws Exception {
     XxlJobLogger.log("XXL-JOB, coverEarlyJobHandler.");
     // 休眠1分钟,模拟执行时间特别长
     TimeUnit.SECONDS.sleep(60);
     return ReturnT.SUCCESS;
    }

在调度中心页面创建3个任务,分别对应3种阻塞处理策略

  1. 任务配置

    image.png

  2. 对每个任务,使用调度中心页面上的 执行一次 按钮 触发 (第一次触发是为了第二次任务进行时存在阻塞的任务)

  3. 观察每个任务的日志:

    • 单机串行【任务id为3】

      image.png
      image.png
      可见两次调度都是成功的,但是第二个任务一直在等待第一个任务执行完成,才会被执行。

    • 丢弃后续调度任务【任务id为4】

      image.png
      可见因为这个任务有一次调度正在被执行中,并且阻塞处理策略为【丢弃后续调度任务】,所以第二次调度结果直接为失败!

    • 覆盖之前调度任务【任务id为5】

      image.png
      可见该任务第二次被调度时,发现已经有调度正在被执行(第一次调度结果为成功),由于阻塞策略设置为【覆盖之前调度任务】,所以执行器会将第一次调度kill掉,然后开始执行第二次调度。

分析

  1. 执行阻塞策略是在执行器中判断、执行的
  2. 调度中心在调度任务时,会将任务配置的阻塞策略通过RPC传给执行器
  3. 执行器在执行时,调用的是 com.xxl.job.core.biz.impl.ExecutorBizImpl#run 方法
  4. 该方法最后一段就是在根据阻塞策略进行一些判断,如下
    // 阻塞策略判断
    if (jobThread != null) {
     // 进入这个判断,则表示当前执行任务的线程中有正在运行的
     // 所以需要根据阻塞策略决定是否执行本次调度
     ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
     if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
         // 因为上一次的调度正在线程池中跑,需要再次判断
         if (jobThread.isRunningOrHasQueue()) {
             // 丢弃后续调度
             return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
         }
     } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
         // kill running jobThread
         // 覆盖之前调度
         if (jobThread.isRunningOrHasQueue()) {
             removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
             jobThread = null;
         }
     } else {
         // just queue trigger
     }
    }
    // 其实这里才是 覆盖之前调度 正在在执行本次调度
    // replace thread (new or exists invalid)
    if (jobThread == null) {
     jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }
文章目录
  1. 1. 模拟任务阻塞处理策略
    1. 1.1. 官方描述
    2. 1.2. 实验
      1. 1.2.1. 编写3个任务执行方法
      2. 1.2.2. 在调度中心页面创建3个任务,分别对应3种阻塞处理策略
    3. 1.3. 分析
评论