延迟任务系统设计

4.1.延迟任务需求分析

延迟任务的概念我们在2.2节中已经阐述,在充吧项目中延迟任务的业务场景有:

场景一: 订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消

场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止 总结下来:简单的理解延迟任务可以认为是在某个设定的时间之后做某一件事情。

4.2.延迟任务单机版实现方案

4.2.1.Timer定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class TimerTaskTest {

public static void main(String[] args) {
//创建timer
Timer timer = new Timer();

/*timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println(System.currentTimeMillis()/1000+"执行了任务");
}
},1000L);
System.out.println(System.currentTimeMillis()/1000);*/
// 如果任务的执行时间<=当前时间,任务会立刻执行
/* timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println(System.currentTimeMillis()/1000+"执行了任务");
}
},new Date(System.currentTimeMillis()-1000L));
System.out.println(System.currentTimeMillis()/1000);*/

/*timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println(System.currentTimeMillis()/1000+"执行了任务");
}
},1000L,2000L);
System.out.println(System.currentTimeMillis()/1000);*/

/*timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println(System.currentTimeMillis()/1000+"执行了任务");
}
},new Date(System.currentTimeMillis()-2000L),2000L);
System.out.println(System.currentTimeMillis()/1000);*/

timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println(System.currentTimeMillis()/1000+"执行了任务");
}
},new Date(System.currentTimeMillis() - 3000L),1000L);
System.out.println(System.currentTimeMillis()/1000);
// 2 3 4 5
}

}

1. schedule() ,2个参数方法: 在执行任务时,如果指定的计划执行时间scheduledExecutionTime <=
systemCurrentTime,则task会被立即执行。 2. schedule() ,3个参数方法: 在执行任务时,如果指定的计划执
行时间scheduledExecutionTime <= systemCurrentTime,则task会被立即执行,之后按period参数固定重复执
行。 3. scheduleAtFixedRate() ,3个参数方法: 在执行任务时,如果指定的计划执行时间
scheduledExecutionTime<= systemCurrentTime,则task会首先按执行一次;然后按照执行时间、系统当前时间
和period参数计算出过期该执行的次数,计算按照: (systemCurrentTime-scheduledExecutionTime)/period,
再次执行计算出的次数;最后按period参数固定重复执行。 4. schedule() 和scheduleAtFixedRate() schedule()
方法更注重保持间隔时间的稳定。 scheduleAtFixedRate()方法更注重保持执行频率的稳定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class TimerTaskTest2 {

public static void main(String[] args) {
Timer timer = new Timer();

/*for(int i=0;i<100;i++){
int finalI = i;
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println(finalI);
if( finalI == 20){
throw new RuntimeException();
}
}
},1000L);
} */
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

for(int i=0;i<100;i++){
int finalI = i;
executorService.schedule(new TimerTask() {
@Override
public void run() {
System.out.println(finalI);
if( finalI == 20){
throw new RuntimeException();
}
}
},1, TimeUnit.SECONDS);
}
}
}

另外对于ScheduledExecutorService还有其他的AP

scheduledExecutorService.scheduleAtFixedRate() 按照固定频率 scheduledExecutorService.scheduleWithFixedDelay() 如果任务时间超过固定频率,按照任务实际时间延后

4.2.2.DelayQueue

DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素 必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队 列中提取元素。

image

DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现 compareTo和getDelay方法

getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。 compareTo方法:用于排序,确定元素出队列的顺序。

实现:

1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,

2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,

3:循环的从延迟队列中拉取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class DelayedTask implements Delayed{

private int executeTime;

public DelayedTask(int delay){
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND,delay);
this.executeTime = (int)(calendar.getTimeInMillis()/1000);
}


/**
* 获得任务对象在队列中的剩余时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
int delay = executeTime - (int)(System.currentTimeMillis()/1000);
return delay;
}

/**
* 用于不同的任务对象之间的排序比较
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
long l = this.getDelay(TimeUnit.SECONDS) - o.getDelay(TimeUnit.SECONDS);
return l==0? 0 :( l < 0 ? -1:1);
}

public static void main(String[] args) {
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
//向队列中添加任务
queue.add(new DelayedTask(5));
queue.add(new DelayedTask(10));
queue.add(new DelayedTask(15));
//消费任务
System.out.println(System.currentTimeMillis()/1000+"开始消费任务");

while (queue.size() !=0){
DelayedTask task = queue.poll();
if(task !=null){
System.out.println(System.currentTimeMillis()/1000+"消费了任务");
}
//每隔1秒钟消费一次
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}





}
}

DelayQueue实现完成之后思考一个问题:

使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保 证数据不丢失,需要持久化(磁盘)

4.4.数据库方案介绍

image

首先要将延迟任务信息写入数据库,然后用一个线程去扫描任务库,然后根据执行情况更改数据库

从数据库查询任务的条件为:select * from taskinfo where execute_time< now();

该方案的优势:建立独立任务表可以系统解耦,简化系统开发

该方案的不足:小型系统如果只有几万任务,采用上述方案即可,如果稍大规模系统,任务量过大,对数据库 造成的压力过大

1
2
3
4
5
知识小贴士:如果数据库的访问压力过大该如何优化? 

1:数据库自身优化策略:建立索引,索引的思想是用空间换时间, 缺点如果数据量太大,过多索引占用大量磁盘空 间,所以一般只对关键字段,经常需要检索的字段设置索引。

2:如果任务量过大,需要考虑分库分表,如果一天100万延迟任务,一月就3000万,一年就4亿2千万数据,对于 mysql,单表超过2万千,就算做了索引查询也非常慢,所以后续需要规划分库分表。

image

延迟任务系统数据库方案实现

MyBatis-Plus介绍

MyBatis-Plus](简称 MP)是一个MyBatis的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开 发、提高效率而生。

GitHub项目地址:https://github.com/baomidou/mybatis-plus

官方文档:https://mybatis.plus/

无侵入:只做增强不做改变,引入它不会对现有工程产生影响,如丝般顺滑

损耗小:启动即会自动注入基本 CURD,性能基本无损耗,直接面向对象操作 强大的 CRUD 操作:内置通用 Mapper、通用 Service,仅仅通过少量配置即可实现单表大部分 CRUD 操作,更有强大的条件构造器,满足各类使用需求

支持 Lambda 形式调用:通过 Lambda 表达式,方便的编写各类查询条件,无需再担心字段写错 支持主键自动生成:

支持多达 4 种主键策略(内含分布式唯一 ID 生成器 - Sequence),可自由配置, 完美解决主键问题 支持 ActiveRecord 模式:

支持 ActiveRecord 形式调用,实体类只需继承 Model 类即可进行强大的 CRUD 操作 支持自定义全局通用操作:支持全局通用方法注入( Write once, use anywhere )

内置代码生成器:采用代码或者 Maven 插件可快速生成 Mapper 、 Model 、 Service 、 Controller 层 代码,支持模板引擎,更有超多自定义配置等您来使用

内置分页插件:基于 MyBatis 物理分页,开发者无需关心具体操作,配置好插件之后,写分页等同于普 通 List 查询

分页插件支持多种数据库:支持 MySQL、MariaDB、Oracle、DB2、H2、HSQL、SQLite、Postgre、 SQLServer2005、SQLServer 等多种数据库

内置性能分析插件:可输出 Sql 语句以及其执行时间,建议开发测试时启用该功能,能快速揪出慢查询

内置全局拦截插件:提供全表 delete 、 update 操作智能分析阻断,也可自定义拦截规则,预防误操作

接入企业:https://mp.baomidou.com/guide/#优秀案例

任务信息表集成MP

1
2
3
4
5
6
7
8
9
10
11
12
<!-- mybatis-plus支持 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.2</version>
</dependency>
<!-- mysql 驱动包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>

</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
application:
name: schedule-service #服务名
datasource:
url: jdbc:mysql://192.168.200.129:3306/chongba_schedule?serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456

redis:
host: 192.168.200.129
password: chongba
port: 6379
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

实体类TaskInfoEntity
@Data //
@ToString
@NoArgsConstructor

@TableName("taskinfo")
public class TaskInfoEntity implements Serializable{

private static final long serialVersionUID = -3218564191925663119L;

@TableId(type = IdType.ID_WORKER)
private Long taskId;

@TableField
private Date executeTime;

@TableField
private Integer priority;

@TableField
private Integer taskType;

@TableField
private byte[] parameters;
}

mybatis-plus 主键生成策略 默认使用全局唯一的数字类型

image-20230206220630716

这几种生成策略分别是:

AUTO 数据库ID自增

INPUT 用户输入ID

ID_WORKER 全局唯一ID,Long类型的主键

ID_WORKER_STR 字符串全局唯一ID

UUID 全局唯一ID,UUID类型的主键

NONE 该类型为未设置主键类型

什么情况下需要序列化?

1:当你想把的内存中的对象状态保存到一个文件中或者数据库中时候;

2:当你想用套接字在网络上传送对象的时候;

3:当你想通过RMI传输对象的时候;

​ Java 提供了一种对象序列化的机制,该机制中,一个对象可以被表示为一个字节序列,该字节序列包括该对象的 数据、有关对象的类型的信息和存储在对象中数据的类型。将序列化对象写入文件之后,可以从文件中读取出来,并 且对它进行反序列化

image

配置好之后在类名处:ALT+ENTER

image

4:创建包com.chongba.schedule.mapper并在该包下创建Mapper接口TaskInfoMapper,只需要继承下 mybatis-plus 提供的 BaseMapper<>在泛型中指定操作的实体,数据库基本增删改差基本不需要写了

1
public interface TaskInfoMapper extends BaseMapper{ }

5:在springboot启动类中,扫描mapper接口所在的包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@ComponentScan({"com.chongba.schedule","com.chongba.cache"})
@MapperScan("com.chongba.schedule.mapper")
@EnableScheduling
public class ScheduleApplication {

public static void main(String[] args) {
SpringApplication.run(ScheduleApplication.class,args);
}

@Bean
public OptimisticLockerInterceptor optimisticLockerInterceptor(){
return new OptimisticLockerInterceptor();
}
}

mybatis-plus集成完毕 对mapper做相关单元测试

6:在测试包com.chongba.schedule下创建测试类TaskInfoMapperTest,测试基本的增删改查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ScheduleApplication.class)
public class TaskInfoMapperTest {

@Autowired
private TaskInfoMapper taskInfoMapper;

@Test
public void test1(){
//保存
TaskInfoEntity taskInfoEntity = new TaskInfoEntity();
taskInfoEntity.setExecuteTime(new Date());
taskInfoEntity.setPriority(1);
taskInfoEntity.setTaskType(1001);
taskInfoEntity.setParameters("test".getBytes());
taskInfoMapper.insert(taskInfoEntity);
System.out.println("保存完成后返回主键值:"+taskInfoEntity.getTaskId());


TaskInfoEntity infoEntity = taskInfoMapper.selectById(taskInfoEntity.getTaskId());
System.out.println("根据主键查询得到的数据:"+infoEntity);


infoEntity.setTaskType(1002);
infoEntity.setPriority(10);
taskInfoMapper.updateById(infoEntity);
}

@Test
public void test2(){
taskInfoMapper.deleteById(1181473027074838529L);
}

@Test
public void test3(){
List<TaskInfoEntity> entities = taskInfoMapper.selectList(null);
for (TaskInfoEntity entity : entities) {
System.out.println(entity);
}
}

idea 接口飘红问题,这不是错误,因为Mapper接口我们并没有提供实现类在容器中,而是在程序运行的过程 中动态生成的代理,idea的检测机制并不知道这一原理,如果看的难受可修改idea配置

image

1
2
3
4
5
6
7
public interface TaskInfoMapper extends BaseMapper{ 

@Select("select * from taskinfo")

public List selectAll();

}

在TaskInfoMapperTest测试类中添加测试方法进行测试

1
2
3
4
5
6
7
8
9
10
11
@Test 
public void test3(){

List selectAll = taskInfoMapper.selectAll();

for (TaskInfoEntity taskInfoEntity : selectAll) {

System.out.println(taskInfoEntity);

}
}

6.3.任务信息历史表集成MP

1:在chongba_schedule_service 工程中 编写与taskinfo_logs表对应的实体TaskInfoLogsEntity, 可以直接 继承 TaskInfoEntity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Data
@ToString
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false) //不继承equals和hashcode方法

@TableName("taskinfo_logs")
public class TaskInfoLogsEntity extends TaskInfoEntity{

/**
* `version` INT(11) NOT NULL COMMENT '版本号,用乐观锁',
`status` INT(11) DEFAULT '0' COMMENT '状态 0=初始化状态 1=EXECUTED 2=CANCELLED',
*/
@Version
private Integer version;

@TableField
private Integer status;
}

​ 数据库自身解决并发两种策略

​ 悲观锁(Pessimistic Lock)

​ 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别 人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁 等,读锁,写锁等,都是在做操作之前先上锁。

​ 乐观锁(Optimistic Lock)

​ 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断 一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高 吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。

​ 2:mybatis-plus对乐观锁的支持,在启动类中向容器中放入乐观锁的拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootApplication
@ComponentScan({"com.chongba.schedule","com.chongba.cache"})
@MapperScan("com.chongba.schedule.mapper")
@EnableScheduling
public class ScheduleApplication {

public static void main(String[] args) {
SpringApplication.run(ScheduleApplication.class,args);
}
/**
* mybatis-plus乐观锁支持
* @return
*/
@Bean
public OptimisticLockerInterceptor optimisticLockerInterceptor(){
return new OptimisticLockerInterceptor();
}
}

​ 3:在com.chongba.schedule.mapper下创建接口TaskInfoLogsMapper

1
2
public interface TaskInfoLogsMapper extends BaseMapper<TaskInfoLogsEntity> {
}

4:在测试包com.chongba.schedule下创建测试类TaskInfoLogsMapperTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void test2(){
TaskInfoLogsEntity taskInfoLogsEntity = new TaskInfoLogsEntity();
taskInfoLogsEntity.setTaskType(1003);
taskInfoLogsEntity.setPriority(3);
taskInfoLogsEntity.setParameters("log".getBytes());
taskInfoLogsEntity.setExecuteTime(new Date());
taskInfoLogsEntity.setVersion(1);
taskInfoLogsEntity.setStatus(0);
logsMapper.insert(taskInfoLogsEntity);

taskInfoLogsEntity = logsMapper.selectById(taskInfoLogsEntity.getTaskId());
System.out.println("数据插入:"+taskInfoLogsEntity);
taskInfoLogsEntity.setStatus(1);
logsMapper.updateById(taskInfoLogsEntity);
taskInfoLogsEntity = logsMapper.selectById(taskInfoLogsEntity.getTaskId());
System.out.println("第一次更新后查询:"+taskInfoLogsEntity);
taskInfoLogsEntity.setPriority(5);
logsMapper.updateById(taskInfoLogsEntity);
taskInfoLogsEntity = logsMapper.selectById(taskInfoLogsEntity.getTaskId());
System.out.println("第二次更新后查询:"+taskInfoLogsEntity);
}

测试结果:主要看版本号

​ 数据插入:TaskInfoLogsEntity(version=1, status=0)
​ 第一次更新后查询:TaskInfoLogsEntity(version=2, status=1)
​ 第二次更新后查询:TaskInfoLogsEntity(version=3, status=1)


https://1902756969.github.io/Hexo/2023/02/11/电商方案/延时任务系统数据库方案实现/
作者
发布于
2023年2月11日
许可协议