返回顶部
分享到

java如何实现高并发场景下三级缓存的数据一致性

java 来源:互联网 作者:佚名 发布时间:2025-07-24 07:51:32 人浏览
摘要

下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括: 1.缓存结构: 本地缓存:使用Caffeine实现,最大容量10,000,写入后10分钟过期 分布式缓存:使用Redisson的RMap结构操作Redi

下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:

1.缓存结构:

  • 本地缓存:使用Caffeine实现,最大容量10,000,写入后10分钟过期
  • 分布式缓存:使用Redisson的RMap结构操作Redis
  • 数据库:作为最终数据源

2.数据读取流程:

  • 先查本地缓存,命中则返回
  • 未命中则查Redis,命中则更新本地缓存并返回
  • 仍未命中则获取锁,再次检查两级缓存(双重检查)
  • 最后从数据库读取,更新两级缓存后返回

3.数据更新流程:

  • 使用分布式锁保证写操作原子性
  • 先更新数据库
  • 删除本地缓存和Redis缓存
  • 通过Redis Pub/Sub发布缓存清除消息给集群内其他节点
  • 执行延迟双删(100毫秒后再次删除Redis缓存)

4.并发控制:

  • 读取时使用本地锁(ReentrantLock)防止缓存击穿
  • 更新时使用Redisson分布式锁(RLock)保证跨节点原子性
  • 锁使用完成后从ConcurrentHashMap中移除

5.集群同步:

  • 使用Redis的RTopic实现消息发布订阅
  • 接收到清除消息时自动删除本地缓存
  • 确保集群内各节点缓存一致性

该实现综合运用了延迟双删、发布订阅、锁机制和TTL等多种策略,保障了高并发场景下三级缓存的数据一致性,尤其适合分布式微服务架构。

实战代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

import java.util.concurrent.*;

import java.util.concurrent.locks.ReentrantLock;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import org.redisson.api.*;

import com.github.benmanes.caffeine.cache.Cache;

import com.github.benmanes.caffeine.cache.CacheBuilder;

 

@Service

public class CacheService {

    // 本地一级缓存(Caffeine)

    private final Cache<String, Object> localCache;

    // Redisson客户端,用于分布式操作

    private final RedissonClient redissonClient;

    // 锁缓存,用于控制并发

    private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();

    // 延迟任务执行器

    private final ScheduledExecutorService scheduledExecutorService;

    // 主题订阅,用于接收集群消息

    private final RTopic cacheClearTopic;

 

    @Autowired

    public CacheService(RedissonClient redissonClient) {

        this.redissonClient = redissonClient;

        this.localCache = CacheBuilder.newBuilder()

                .maximumSize(10_000)

                .expireAfterWrite(10, TimeUnit.MINUTES)

                .build();

        this.scheduledExecutorService = Executors.newScheduledThreadPool(5);

        this.cacheClearTopic = redissonClient.getTopic("cache:clear");

         

        // 注册消息监听器

        cacheClearTopic.addListener(String.class, (channel, key) -> {

            localCache.invalidate(key);

        });

    }

 

    // 读取缓存

    public Object get(String key) {

        // 1. 先查本地缓存

        Object value = localCache.getIfPresent(key);

        if (value != null) {

            return value;

        }

 

        // 2. 本地缓存未命中,查Redis

        RMap<String, Object> redisMap = redissonClient.getMap("cache");

        value = redisMap.get(key);

        if (value != null) {

            localCache.put(key, value);

            return value;

        }

 

        // 3. Redis未命中,查数据库

        ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());

        lock.lock();

        try {

            // 双重检查

            value = localCache.getIfPresent(key);

            if (value != null) {

                return value;

            }

             

            value = redisMap.get(key);

            if (value != null) {

                localCache.put(key, value);

                return value;

            }

 

            // 从数据库读取

            value = readFromDatabase(key);

            if (value != null) {

                // 放入Redis并设置TTL

                redisMap.put(key, value, 300, TimeUnit.SECONDS);

                // 放入本地缓存

                localCache.put(key, value);

            }

            return value;

        } finally {

            lock.unlock();

            lockMap.remove(key);

        }

    }

 

    // 更新数据

    public void update(String key, Object value) {

        // 使用分布式锁保证写操作的原子性

        RLock lock = redissonClient.getLock("writeLock:" + key);

        lock.lock();

        try {

            // 1. 更新数据库

            boolean success = updateDatabase(key, value);

            if (success) {

                // 2. 先删除本地缓存

                localCache.invalidate(key);

                // 3. 删除Redis缓存

                RMap<String, Object> redisMap = redissonClient.getMap("cache");

                redisMap.remove(key);

                // 4. 发布清除缓存的消息到集群

                cacheClearTopic.publish(key);

                // 5. 延迟双删

                scheduledExecutorService.schedule(() -> {

                    redisMap.remove(key);

                }, 100, TimeUnit.MILLISECONDS);

            }

        } finally {

            lock.unlock();

        }

    }

 

    // 从数据库读取数据(示例方法)

    private Object readFromDatabase(String key) {

        // 实际实现中会查询数据库

        return "data_from_db_" + key;

    }

 

    // 更新数据库(示例方法)

    private boolean updateDatabase(String key, Object value) {

        // 实际实现中会更新数据库

        return true;

    }

}   

redisson配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

import org.redisson.Redisson;

import org.redisson.api.RedissonClient;

import org.redisson.config.Config;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

 

@Configuration

public class RedissonConfig {

 

    @Bean

    public RedissonClient redissonClient() {

        Config config = new Config();

        // 单机模式配置

        config.useSingleServer()

              .setAddress("redis://localhost:6379")

              .setConnectionMinimumIdleSize(5)

              .setConnectionPoolSize(50);

         

        // 集群模式配置示例

        /*

        config.useClusterServers()

              .addNodeAddress("redis://node1:6379", "redis://node2:6379")

              .setScanInterval(2000)

              .setMasterConnectionMinimumIdleSize(10)

              .setMasterConnectionPoolSize(64)

              .setSlaveConnectionMinimumIdleSize(10)

              .setSlaveConnectionPoolSize(64);

        */

         

        return Redisson.create(config);

    }

}   


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计