下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:
1.缓存结构:
2.数据读取流程:
3.数据更新流程:
4.并发控制:
5.集群同步:
该实现综合运用了延迟双删、发布订阅、锁机制和TTL等多种策略,保障了高并发场景下三级缓存的数据一致性,尤其适合分布式微服务架构。
实战代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.redisson.api.*; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.CacheBuilder;
@Service public class CacheService { // 本地一级缓存(Caffeine) private final Cache<String, Object> localCache; // Redisson客户端,用于分布式操作 private final RedissonClient redissonClient; // 锁缓存,用于控制并发 private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>(); // 延迟任务执行器 private final ScheduledExecutorService scheduledExecutorService; // 主题订阅,用于接收集群消息 private final RTopic cacheClearTopic;
@Autowired public CacheService(RedissonClient redissonClient) { this.redissonClient = redissonClient; this.localCache = CacheBuilder.newBuilder() .maximumSize(10_000) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); this.scheduledExecutorService = Executors.newScheduledThreadPool(5); this.cacheClearTopic = redissonClient.getTopic("cache:clear");
// 注册消息监听器 cacheClearTopic.addListener(String.class, (channel, key) -> { localCache.invalidate(key); }); }
// 读取缓存 public Object get(String key) { // 1. 先查本地缓存 Object value = localCache.getIfPresent(key); if (value != null) { return value; }
// 2. 本地缓存未命中,查Redis RMap<String, Object> redisMap = redissonClient.getMap("cache"); value = redisMap.get(key); if (value != null) { localCache.put(key, value); return value; }
// 3. Redis未命中,查数据库 ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock()); lock.lock(); try { // 双重检查 value = localCache.getIfPresent(key); if (value != null) { return value; }
value = redisMap.get(key); if (value != null) { localCache.put(key, value); return value; }
// 从数据库读取 value = readFromDatabase(key); if (value != null) { // 放入Redis并设置TTL redisMap.put(key, value, 300, TimeUnit.SECONDS); // 放入本地缓存 localCache.put(key, value); } return value; } finally { lock.unlock(); lockMap.remove(key); } }
// 更新数据 public void update(String key, Object value) { // 使用分布式锁保证写操作的原子性 RLock lock = redissonClient.getLock("writeLock:" + key); lock.lock(); try { // 1. 更新数据库 boolean success = updateDatabase(key, value); if (success) { // 2. 先删除本地缓存 localCache.invalidate(key); // 3. 删除Redis缓存 RMap<String, Object> redisMap = redissonClient.getMap("cache"); redisMap.remove(key); // 4. 发布清除缓存的消息到集群 cacheClearTopic.publish(key); // 5. 延迟双删 scheduledExecutorService.schedule(() -> { redisMap.remove(key); }, 100, TimeUnit.MILLISECONDS); } } finally { lock.unlock(); } }
// 从数据库读取数据(示例方法) private Object readFromDatabase(String key) { // 实际实现中会查询数据库 return "data_from_db_" + key; }
// 更新数据库(示例方法) private boolean updateDatabase(String key, Object value) { // 实际实现中会更新数据库 return true; } } |
redisson配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RedissonConfig {
@Bean public RedissonClient redissonClient() { Config config = new Config(); // 单机模式配置 config.useSingleServer() .setAddress("redis://localhost:6379") .setConnectionMinimumIdleSize(5) .setConnectionPoolSize(50);
// 集群模式配置示例 /* config.useClusterServers() .addNodeAddress("redis://node1:6379", "redis://node2:6379") .setScanInterval(2000) .setMasterConnectionMinimumIdleSize(10) .setMasterConnectionPoolSize(64) .setSlaveConnectionMinimumIdleSize(10) .setSlaveConnectionPoolSize(64); */
return Redisson.create(config); } } |