模拟任务阻塞处理策略
官方描述
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
- 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
- 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
- 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
为了验证并且更加熟悉XXL-JOB这一块的功能,遂做了如下的实验:
实验
编写3个任务执行方法
模拟【阻塞处理策略】为
单机串行
@XxlJob("serialExecutionJobHandler") public ReturnT<String> serialExecutionJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, serialExecutionJobHandler."); // 休眠1分钟,模拟执行时间特别长 TimeUnit.SECONDS.sleep(60); return ReturnT.SUCCESS; }
模拟【阻塞处理策略】为
丢弃后续调度
@XxlJob("discardLaterJobHandler") public ReturnT<String> discardLaterJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, discardLaterJobHandler."); // 休眠1分钟,模拟执行时间特别长 TimeUnit.SECONDS.sleep(60); return ReturnT.SUCCESS; }
模拟【阻塞处理策略】为
覆盖之前调度
@XxlJob("coverEarlyJobHandler") public ReturnT<String> coverEarlyJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, coverEarlyJobHandler."); // 休眠1分钟,模拟执行时间特别长 TimeUnit.SECONDS.sleep(60); return ReturnT.SUCCESS; }
在调度中心页面创建3个任务,分别对应3种阻塞处理策略
任务配置
对每个任务,使用调度中心页面上的
执行一次
按钮 触发 (第一次触发是为了第二次任务进行时存在阻塞的任务)观察每个任务的日志:
单机串行【任务id为3】
可见两次调度都是成功的,但是第二个任务一直在等待第一个任务执行完成,才会被执行。丢弃后续调度任务【任务id为4】
可见因为这个任务有一次调度正在被执行中,并且阻塞处理策略为【丢弃后续调度任务】,所以第二次调度结果直接为失败!覆盖之前调度任务【任务id为5】
可见该任务第二次被调度时,发现已经有调度正在被执行(第一次调度结果为成功),由于阻塞策略设置为【覆盖之前调度任务】,所以执行器会将第一次调度kill掉,然后开始执行第二次调度。
分析
- 执行阻塞策略是在执行器中判断、执行的
- 调度中心在调度任务时,会将任务配置的阻塞策略通过RPC传给执行器
- 执行器在执行时,调用的是
com.xxl.job.core.biz.impl.ExecutorBizImpl#run
方法 - 该方法最后一段就是在根据阻塞策略进行一些判断,如下
// 阻塞策略判断 if (jobThread != null) { // 进入这个判断,则表示当前执行任务的线程中有正在运行的 // 所以需要根据阻塞策略决定是否执行本次调度 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // 因为上一次的调度正在线程池中跑,需要再次判断 if (jobThread.isRunningOrHasQueue()) { // 丢弃后续调度 return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // kill running jobThread // 覆盖之前调度 if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // just queue trigger } } // 其实这里才是 覆盖之前调度 正在在执行本次调度 // replace thread (new or exists invalid) if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); }