0%

同一任务并发执行与数据共享

#同一任务并发执行与数据共享
@(springboot)[quartz]

[TOC]

1. 禁止同一个 JobDetail 中的多个实例并发执行

Quartz 定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行。

同一任务的含义指同一个JobKey

1.1 方法一、使用 xml

Spring 配置文件中加入:

1
2
3
4
5
6
7
8
<bean id="resultJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<!-- 调用的类 -->
<property name="targetObject" ref="resultJob"></property>
<!-- 调用类中的方法 -->
<property name="targetMethod" value="createDemEmps"></property>
<!-- false 表示等上一个任务执行完后再开启新的任务 -->
<property name="concurrent" value="false" />
</bean>

1.2 方法二、使用注解

当不使用 Spring 的时候,只需要在 Job 的实现类上加 @DisallowConcurrentExecution 的注解。

1
2
3
4
5
6
7
8
9
@DisallowConcurrentExecution
public class DaemonExecutionSchedule implements Job{
private Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
...
}
}

2. 同一个 JobDetail 中多个实例的数据共享

@PersistJobDataAfterExecution 是用在 Job 实现类上,表示一个有状态的任务,意思是当正常执行完 Job 后,JobDataMap 中的数据应该被改动,以被下一次调用时用。

注意:当使用 @PersistJobDataAfterExecution 注解时,为了避免并发时,存储数据造成混乱,强烈建议把 @DisallowConcurrentExecution 注解也加上。

3. 示例

假设定时任务的时间间隔为 3 秒,但 job 执行时间是 10 秒。当设置 @DisallowConcurrentExecution 以后程序会等任务执行完毕后再去执行。

当设置 @PersistJobDataAfterExecution 时,在执行完 Job 的 execution 方法后保存 JobDataMap 当中固定数据,以便任务在重复执行的时候具有相同的 JobDataMap;在默认情况下也就是没有设置 @PersistJobDataAfterExecution 的时候每个 job 都拥有独立 JobDataMap。

任务类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package org.quartz.examples;

import org.quartz.*;
import java.util.Date;

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class TaskJob implements Job {

public static final String NUM_EXECUTIONS = "NumExecutions";
public static final String EXECUTION_DELAY = "ExecutionDelay";

/**
* 静态变量可以保持工作状态,但无法达到预期效果
*/
private static int _staticCounter = 0;

/**
* Quartz 每次执行作业时都会重新实例化,非静态变量无法保持工作状态
*/
private int _counter = 0;

/**
* 需要一个公共的空构造方法,以便 scheduler 随时实例化 job
*/
public TaskJob() {
}

/**
* 该方法实现需要执行的任务
*/
public void execute(JobExecutionContext context) throws JobExecutionException {
System.err.println("---> " + context.getJobDetail().getKey() + " 运行中[" + new Date() + "]");

JobDataMap map = context.getJobDetail().getJobDataMap();

int executeCount = 0;
if (map.containsKey(NUM_EXECUTIONS)) {
executeCount = map.getInt(NUM_EXECUTIONS);
}

// 增量计数并将其存储回 JobDataMap,这样可以适当保持工作状态
executeCount++;
map.put(NUM_EXECUTIONS, executeCount);

// 只要有任务执行都会递增,无法达到预期效果
_staticCounter++;

// 本地变量递增加,但实际上无法保持工作状态
_counter++;

long delay = 5000L;
if (map.containsKey(EXECUTION_DELAY)) {
delay = map.getLong(EXECUTION_DELAY);
}

try {
// 模拟一个耗时的 job
Thread.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.err.println(context.getJobDetail().getKey() + " 的静态变量 _staticCounter 为:" + _staticCounter + ",非静态变量 scheduler 为:" + _counter);
System.err.println(context.getJobDetail().getKey() + " 完成了(" + executeCount + ")次 <---");
}
}

任务调度类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package org.quartz.examples;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

import java.util.Date;

public class Executer {

public void run() throws Exception {
// 通过 schedulerFactory 获取一个调度器
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();

// 创建 jobDetail 实例,绑定 Job 实现类
// 指明 job 的名称,所在组的名称,以及绑定 job 类
JobDetail job1 = JobBuilder.newJob(TaskJob.class)
.withIdentity("statefulJob1", "group1")
// 给定的键-值对添加到 JobDetail 的 JobDataMap 中
.usingJobData(TaskJob.EXECUTION_DELAY, 10000L).build();

// 定义调度触发规则,先立即执行一次,然后每隔 3 秒执行一次
SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(3)
.repeatForever())
.build();

// 把作业和触发器注册到任务调度中
Date firstRunTime = sched.scheduleJob(job1, trigger);
System.out.println(job1.getKey() + " 开始运行于:" + firstRunTime + ",重复:" + trigger.getRepeatCount() + " 次,每次间隔 "
+ trigger.getRepeatInterval() / 1000 + " 秒");

// 任务 job1 方法中拿到的 JobDataMap 的数据是共享的
// 这里要注意一个情况: 就是 JobDataMap 的数据共享只针对一个 job1 任务
// 如果在下面在新增加一个任务 那么他们之间是不共享的,比如下面的 job2
// 创建第二个 JobDetail 实例
JobDetail job2 = JobBuilder.newJob(TaskJob.class)
.withIdentity("statefulJob2", "group1")
// 给定的键-值对添加到 JobDetail 的 JobDataMap 中
.usingJobData(TaskJob.EXECUTION_DELAY, 10000L)
.build();

// 定义调度触发规则,先立即执行一次,然后每隔 3 秒执行一次
trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger2", "group1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().
withIntervalInSeconds(3)
.repeatForever()
// 指定失效时的策略
.withMisfireHandlingInstructionNowWithExistingCount())
.build();

// 这个 job2 与 job1 执行的 JobDataMap 不共享
// 把作业和触发器注册到任务调度中
firstRunTime = sched.scheduleJob(job2, trigger);
System.out.println(job2.getKey() + " 开始运行于:" + firstRunTime + ",重复:" + trigger.getRepeatCount() + " 次,每次间隔 "
+ trigger.getRepeatInterval() / 1000 + " 秒");

// 启动计划程序(实际上直到调度器已经启动才会开始运行)
sched.start();

// 等待 60 秒,使我们的 job 有机会执行
Thread.sleep(60000);

// 等待作业执行完成时才关闭调度器
sched.shutdown(true);

SchedulerMetaData metaData = sched.getMetaData();
System.out.println("一共运行了:" + metaData.getNumberOfJobsExecuted() + " 个任务");
}

public static void main(String[] args) throws Exception {
Executer example = new Executer();
example.run();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
group1.statefulJob1 开始运行于:Wed Apr 19 17:04:22 CST 2017,重复:-1 次,每次间隔 3 秒
group1.statefulJob2 开始运行于:Wed Apr 19 17:04:22 CST 2017,重复:-1 次,每次间隔 3 秒

---> group1.statefulJob2 运行中[Wed Apr 19 17:04:22 CST 2017]
---> group1.statefulJob1 运行中[Wed Apr 19 17:04:22 CST 2017]
group1.statefulJob2 的静态变量 _staticCounter 为:2,非静态变量 scheduler 为:1
group1.statefulJob1 的静态变量 _staticCounter 为:2,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(1)次 <---
group1.statefulJob1 完成了(1)次 <---
---> group1.statefulJob1 运行中[Wed Apr 19 17:04:32 CST 2017]
---> group1.statefulJob2 运行中[Wed Apr 19 17:04:32 CST 2017]
group1.statefulJob1 的静态变量 _staticCounter 为:4,非静态变量 scheduler 为:1
group1.statefulJob1 完成了(2)次 <---
group1.statefulJob2 的静态变量 _staticCounter 为:4,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(2)次 <---
---> group1.statefulJob1 运行中[Wed Apr 19 17:04:42 CST 2017]
---> group1.statefulJob2 运行中[Wed Apr 19 17:04:42 CST 2017]
group1.statefulJob2 的静态变量 _staticCounter 为:6,非静态变量 scheduler 为:1
group1.statefulJob1 的静态变量 _staticCounter 为:6,非静态变量 scheduler 为:1
group1.statefulJob1 完成了(3)次 <---
group1.statefulJob2 完成了(3)次 <---
---> group1.statefulJob1 运行中[Wed Apr 19 17:04:52 CST 2017]
---> group1.statefulJob2 运行中[Wed Apr 19 17:04:52 CST 2017]
group1.statefulJob2 的静态变量 _staticCounter 为:8,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(4)次 <---
group1.statefulJob1 的静态变量 _staticCounter 为:8,非静态变量 scheduler 为:1
group1.statefulJob1 完成了(4)次 <---
---> group1.statefulJob2 运行中[Wed Apr 19 17:05:02 CST 2017]
---> group1.statefulJob1 运行中[Wed Apr 19 17:05:02 CST 2017]
group1.statefulJob2 的静态变量 _staticCounter 为:10,非静态变量 scheduler 为:1
group1.statefulJob1 的静态变量 _staticCounter 为:10,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(5)次 <---
group1.statefulJob1 完成了(5)次 <---
---> group1.statefulJob1 运行中[Wed Apr 19 17:05:12 CST 2017]
---> group1.statefulJob2 运行中[Wed Apr 19 17:05:12 CST 2017]

group1.statefulJob2 的静态变量 _staticCounter 为:12,非静态变量 scheduler 为:1
group1.statefulJob2 完成了(6)次 <---
group1.statefulJob1 的静态变量 _staticCounter 为:12,非静态变量 scheduler 为:1
group1.statefulJob1 完成了(6)次 <---

一共运行了:12 个任务