业务需求_数据更新双删缓存一致

[TOC]

业务需求

1
数据(音频文件)更新操作,如何确保数据库中的数据和Redis缓存中的数据一致

分析

在确保数据库和 Redis 缓存数据一致性时,需要根据具体的业务需求和系统特点选择合适的策略。可以考虑使用消息队列和分布式事务确保强一致性,同时结合乐观锁和版本控制可以更好地处理并发问题。

前置

  1. Spring Boot:使用Spring Boot框架。
  2. Kafka:使用Kafka作为消息队列系统。
  3. Mybatis:使用Mybatis进行数据库操作。
  4. Redis:使用Redis作为缓存系统。

实现

  1. 第一次删除缓存:在更新数据库之前删除缓存。
  2. 更新数据库:执行数据库更新操作。
  3. 发送消息到消息队列:数据库更新完成后,发送一条消息到消息队列请求再次删除缓存。
  4. 从消息队列消费消息:消息队列消费者读取到消息,延迟一段时间 (延迟时间设置为,数据库主从同步的时间,再加几百毫秒) , 消息队列的消费者再次删除缓存。

具体逻辑

双删缓存一致

功能实现

具体类文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

Rhythm
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─test
│ │ │ │
│ │ │ ├─consumer
│ │ │ │ UploadAudioFileCacheConsumer.java
│ │ │ │
│ │ │ ├─controller
│ │ │ │ │ FileController.java
│ │ │ │ │
│ │ │ │ ├─music
│ │ │ │ │ MusicController.java
│ │ │ │
│ │ │ ├─mapper
│ │ │ │ MusicMapper.java
│ │ │ │
│ │ │ ├─service
│ │ │ │ │ CacheService.java
│ │ │ │ │ MusicService.java
│ │ │ │ │
│ │ │ │ └─impl
│ │ │ │ CacheServiceImpl.java
│ │ │ │ MusicServiceImpl.java

代码实现

com.test.controller.FileController

  • 文件上传
1
2
3
4
5
6
7
@PostMapping("/upload/audioFile/{music_id}")
@ResponseBody
public Object uploadAudioFileByMusicId( @RequestParam("multipartFile") MultipartFile multipartFile, @PathVariable("music_id") Integer music_id ) throws IOException, NoSuchMethodException {

return musicService.uploadAudioFileByMusicId(multipartFile,music_id);

}

com.test.consumer.UploadAudioFileCacheConsumer

  • 延迟处理消息队列消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class UploadAudioFileCacheConsumer {

@Resource
private CacheService cacheService;

/**
*
* // CPU密集型任务线程池
* ExecutorService cpuIntensivePool = Executors.newFixedThreadPool(4); // 固定线程数
* // I/O密集型任务线程池
* ExecutorService ioIntensivePool = Executors.newCachedThreadPool(); // 动态线程数
* // 轻量级短期任务线程池
* ExecutorService shortTaskPool = Executors.newSingleThreadExecutor(); // 单线程池
*
*
* 线程池的嵌套 一个线程池中创建使用另一个线程池
*/
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

@KafkaListener(topics = "uploadAudioFileCacheDelete-topic", groupId = "my-group")
public void consume(ConsumerRecord<String,String> record, Acknowledgment acknowledgment) {
// 延迟处理消息,假设数据库主从同步时间为1秒,再加500毫秒
System.out.println("==> 延迟处理消息,等待数据库主从同步..., 时间为1秒,再加500毫秒");
long delay = 15000L;

scheduler.schedule(() -> {
try {
// 再次删除缓存(保证双删)
System.out.println("==> 再次删除缓存(保证双删)");
Integer music_id = Integer.valueOf(JSON.parseObject( record.key() , String.class ));
cacheService.deleteCache(music_id);
// 确认消息
acknowledgment.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}, delay, TimeUnit.MILLISECONDS);
}
}

com.test.service.impl.MusicServiceImpl

  • 异步双删缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
@Override
public Object uploadAudioFileByMusicId( MultipartFile multipartFile, Integer music_id) throws IOException, NoSuchMethodException {

/**
*
* **第一次删除缓存**:在更新数据库之前删除缓存。
*
*/

// 步骤一:去Redis中删除缓存数据

System.out.println("==> 第一次删除缓存");

cacheService.deleteCache(music_id);


/**
*
* **更新数据库**:上传文件,更新数据库字段。
*/

System.out.println("==> 上传文件,更新数据库字段");

System.out.println( "参数名称 = " + multipartFile.getName() );
System.out.println( "文件类型 = " + multipartFile.getContentType() );
System.out.println( "原文件名 = " + multipartFile.getOriginalFilename() );
System.out.println( "文件大小 = " + multipartFile.getSize() );

// 判断 上传的文件类型 是否是音频文件
if( ! multipartFile.getContentType().startsWith("audio/") ){
System.out.println( "==> 上传的文件不是音频文件,上传失败" );
return null;
}

// 判断 上传的文件尺寸
if( multipartFile.getSize() > 20 * 1024 * 1024 ){
System.out.println( "==> 上传的文件最多20MB,上传失败" );
return null;
}

// 给 上传文件 生成服务器端 文件名
String filename = UUID.randomUUID().toString() +
multipartFile.getOriginalFilename().substring( multipartFile.getOriginalFilename().lastIndexOf(".") );


String filePath = ResourceUtils.getURL("classpath:").getPath() +
"static/audio/" + filename;
// 绝对路径前面多了一个/ 去除
// 确保 filePath 不以斜杠开头
String fileNewPath = null;
if (filePath.startsWith("/")) {
fileNewPath = filePath.substring(1);
} else if (filePath.startsWith("file:") ) {
fileNewPath = filePath.replaceFirst("^file:", "");
} else {
fileNewPath = filePath;
}
System.out.println("fileNewPath: " + fileNewPath);

// 指定 转移目标文件
File target = new File(fileNewPath);


// 上传的文件保存在 src/main/resources/static/audio/ 目录
String currentDirectory = System.getProperty("user.dir"); // 当前工作目录是指程序启动时所在的目录
System.out.println("currentDirectory: " + currentDirectory);
String targetLocalDirectory = currentDirectory + "/src/main/resources/static/audio/" + filename;
System.out.println("targetLocalDirectory: " + targetLocalDirectory);

System.out.println("检测目标目录是否存在... ");
if (Files.notExists(Path.of(currentDirectory + "/src/main/resources/static/audio/"))) {
System.out.println("Directory does not exist. Creating now...");
Files.createDirectories(Path.of(currentDirectory + "/src/main/resources/static/audio/"));
System.out.println("Directory created.");
} else {
System.out.println("Directory already exists.");
}

multipartFile.transferTo(new File(targetLocalDirectory));

// 使用乐观锁更新数据库
// 在实际的数据库应用中,乐观锁通常会依赖一个版本号字段,该字段会随着每次更新而增加。在更新数据时,将这个版本号一同传递。程序会检查版本号是否匹配来确定数据是否被别的线程修改
System.out.println("使用乐观锁更新数据库...");
Boolean existingAudioFile = musicMapper.selectById(music_id).getMusicFile().isEmpty();
// 当Optional中有值时
Optional<Boolean> optionalTrue = Optional.of(existingAudioFile);
boolean value = optionalTrue.orElseThrow(() -> new RuntimeException("AudioFile not found"));
System.out.println("Value: " + value); // 输出: Value: false
updateMusic(music_id,filename);


/**
*
* **发送消息到消息队列**:上传文件,更新数据库字段完成,发送一条消息到消息队列请求再次删除缓存。
*
* 消息队列消费者读取到消息,延迟一段时间 (延迟时间设置为,数据库主从同步的时间,再加几百毫秒) , 消息队列的消费者再次删除缓存。
*
*/

System.out.println("==> 上传文件,更新数据库字段完成,发送一条消息到消息队列请求再次删除缓存");

kafkaTemplate.send( "uploadAudioFileCacheDelete-topic" , JSON.toJSONString( music_id ) , null );

return "File upload successful: " + fileNewPath +", " + targetLocalDirectory;
}

com.test.service.impl.CacheServiceImpl

  • 删除redis缓存方法封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
public class CacheServiceImpl implements CacheService {


@Resource
private RedisUtil redisUtil;


/**
* 删除缓存
*
* @param music_id
* @return
*/
@Override
public boolean deleteCache(Integer music_id) {

// Redis Key 生成规则:方法签名+实参数据

Class<MusicController> musicControllerClass = MusicController.class;

String theParameter = String.valueOf(music_id);
String[] numbers = {theParameter};

Map<String,Object> keyMap = new HashMap<>();
keyMap.put( "signature" , "String com.test.controller.music.MusicController.playAudio(String)" );
keyMap.put( "arguments" , numbers );
String key = JSON.toJSONString( keyMap );
String key_redis_lock_Mutex = "redis_lock_Mutex-" + JSON.toJSONString( keyMap );
System.out.println("待删除的redis key: " + key);
System.out.println("待删除的redis key: " + key_redis_lock_Mutex);

// delete key
redisUtil.del(key);
System.out.println("Cache deleted Redis keys: " + key);
redisUtil.del(key_redis_lock_Mutex);
System.out.println("Cache deleted Redis keys: " + key_redis_lock_Mutex);

return false;
}


}

package com.test.controller.music;

  • 播放音频controller
1
2
3
4
5
6
7
@RedisCache( duration = 60 * 60 )
@GetMapping(value = "/playAudio/{music_id}", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
@ResponseBody
public String playAudio(@PathVariable String music_id) throws IOException {

return musicService.playAudio(music_id);
}

com.test.service.impl.MusicServiceImpl

  • 播放音频Impl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@Override
public String playAudio(String music_id) throws IOException {
String filename = null;
try {
// 指定要播放的音频文件
filename = musicMapper.selectById(music_id).getMusicFile();
System.out.println(filename);

String filePath = ResourceUtils.getURL("classpath:").getPath() +
"static/audio/" + filename;
// 绝对路径前面多了一个/ 去除
// 确保 filePath 不以斜杠开头
String fileNewPath = null;
if (filePath.startsWith("/")) {
fileNewPath = filePath.substring(1);
} else if (filePath.startsWith("file:") ) {
fileNewPath = filePath.replaceFirst("^file:", "");
} else {
fileNewPath = filePath;
}
System.out.println("fileNewPath: " + fileNewPath);
Path audioFilePath = Paths.get(fileNewPath);

// 创建文件对象
File file = new File(
fileNewPath
);

File file_setLocation = new File(
System.getProperty("user.dir") + "/src/main/resources/static/audio/" + filename
);
System.out.println("file: " + file);

System.out.println("file_setLocation: " + file_setLocation);


// 使用 ResourceLoader 来加载资源
org.springframework.core.io.Resource resource = resourceLoader.getResource(fileNewPath);
System.out.println("org.springframework.core.io.Resource: " + resource);

// 获取资源的输入流
// springboot 打 jar 包后读取不到文件。这通常是因为 JAR 包本质上是一个压缩文件,
// 如果我们直接用文件系统路径读文件,而不是使用类路径读取,就会发生该问题。
// 我们将使用 ClassLoader.getResourceAsStream 方法读取资源文件。这样即便资源被打包在 JAR 文件内,也能够正常读取。
/**
*
* caution: 资源路径应当是相对classpath路径(类路径)的路径。
* 例如,如果 src/main/resources/static/test.wav ,那么你应该使用 static/test.wav 。
*
*
*/
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("static/audio/" + filename);
System.out.println("inputStream: " + inputStream);

if (inputStream != null || file.exists() || file_setLocation.exists()) {

System.out.println("文件存在");

audioParserUtils.incrementPlayCount(music_id);

ZSetOperations<String, Object> zSetOps = redisUtil.zSet();
zSetOps.add("audio:topSongsByPlaycount", filename, (Integer) redisUtil.get("audio:playcountByWeekByMusicId:" + music_id));

if (inputStream != null) {
System.out.println("return inputStream.readAllBytes()");
return Arrays.toString(inputStream.readAllBytes());
} else if (file.exists()) {
System.out.println("return Files.readAllBytes(audioFilePath)");
return Arrays.toString(Files.readAllBytes(audioFilePath));
} else {
System.out.println("return Files.readAllBytes(file_setLocation.toPath())");
return Arrays.toString(Files.readAllBytes(file_setLocation.toPath()));
}

} else {

System.out.println("File does not exist: " + filename);

audioParserUtils.incrementPlayCount(music_id);

ZSetOperations<String, Object> zSetOps = redisUtil.zSet();
zSetOps.add("audio:topSongsByPlaycount", filename, (Integer) redisUtil.get("audio:playcountByWeekByMusicId:" + music_id));

return null;
}

} catch (Exception e) {
throw new RuntimeException("Error loading file " + filename, e);
}
}

case演示

  • 播放音频

image-20240625085006284

  • 首先查询redis是否有缓存, 无, 查询数据库并生成缓存

image-20240625085502934

image-20240625085624811

  • 播放音频, 查询redis有缓存, 直接返回缓存数据

image-20240625085730253

  • 上传更新文件

image-20240625090708838

  • 删除缓存并执行相关操作

image-20240625090951603

  • 发送消息到kafka消息队列

image-20240625091205890

  • 读取消息, 异步执行其余相关操作

image-20240625091232748

参考:

[1] 倪炜, 分布式消息中间件实践. 电子工业出版社, 2018.

相关问题

双删缓存

在更新数据库之前和之后各进行一次缓存删除

  1. 第一次删除缓存:在更新数据库之前删除缓存,目的是使缓存中的旧数据失效,避免在数据库更新过程中其他请求访问到旧数据。
  2. 第二次删除缓存:在数据库更新之后再次删除缓存,确保在数据库更新过程中 ( 写从数据库数据写入 + 读主数据库从写从数据库同步数据 ) 可能新插入的缓存被正确清理掉。这一步特别重要,因为在高并发的情况下,可能会有新的读请求在数据库更新和第一次删除缓存之间写入了缓存。

双删

双删策略(Double Delete Cache Strategy)是一种在缓存一致性中使用的方法,用于确保在高并发情况下,缓存和数据库的数据保持一致。它通过在数据库更新操作前后两次删除缓存,来减少数据不一致的情况出现。下面详细解释为什么需要两次删除缓存以实现缓存一致性:

1. 第一次删除缓存

目的

在更新数据库之前首先删除缓存项,确保在你开始数据库操作时,已有的旧缓存数据被清除。

原因
  1. 防止脏数据读取:防止读请求在更新数据库之前读取到旧的、不准确的缓存数据。
  2. 减少并发更新:避免在接下来的数据库更新操作中,有读请求命中了旧的缓存数据。

2. 数据库更新

在缓存被清除后,执行数据库的更新操作。

3. 第二次删除缓存

数据库更新成功后,再次删除缓存(利用异步消息的方式),以覆盖可能由并发操作带来的不一致风险。

目的

确保更新后的最新数据被正确缓存。

原因
  1. 处理并发写操作:在缓存删除到数据库更新的过程中,可能有并发写操作再次向缓存中写入了旧数据。
  2. 确保最终一致性:通过异步消息(如Kafka)在数据库更新完成后再次删除缓存,确保缓存中的数据与数据库中的数据最终一致。

示例流程图

  1. 初始数据状态

    • 缓存:Old Value
    • 数据库:Old Value
  2. 第一次删除缓存

    • 缓存:空
    • 数据库:Old Value
  3. 数据库更新

    • 缓存:空
    • 数据库:New Value
  4. 第二次删除缓存(异步消息) (消息队列消费者读取到消息,延迟一段时间(延迟时间设置为,数据库读主写从同步的时间,再加几百毫秒))

    • 缓存:空
    • (读)数据库:New Value
  5. 缓存重新加载新数据(未来某个时刻使用缓存时)

    • 缓存:New Value
    • 数据库:New Value

双删缓存一致

总结

双删策略通过在数据库更新前和更新后分别进行两次删除缓存操作,确保在高并发写操作、读取操作下,缓存和数据库的数据保持一致。而通过第二次删除缓存,确保即使在并发情况下,也能清除可能存在的旧数据,达到最终一致性。