import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.redisson.api.*;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.CacheBuilder;
@Service
public class CacheService {
// 本地一级缓存(Caffeine)
private final Cache<String, Object> localCache;
// Redisson客户端,用于分布式操作
private final RedissonClient redissonClient;
// 锁缓存,用于控制并发
private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
// 延迟任务执行器
private final ScheduledExecutorService scheduledExecutorService;
// 主题订阅,用于接收集群消息
private final RTopic cacheClearTopic;
@Autowired
public CacheService(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.localCache = CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
this.scheduledExecutorService = Executors.newScheduledThreadPool(5);
this.cacheClearTopic = redissonClient.getTopic("cache:clear");
// 注册消息监听器
cacheClearTopic.addListener(String.class, (channel, key) -> {
localCache.invalidate(key);
});
}
// 读取缓存
public Object get(String key) {
// 1. 先查本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
// 2. 本地缓存未命中,查Redis
RMap<String, Object> redisMap = redissonClient.getMap("cache");
value = redisMap.get(key);
if (value != null) {
localCache.put(key, value);
return value;
}
// 3. Redis未命中,查数据库
ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());
lock.lock();
try {
// 双重检查
value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
value = redisMap.get(key);
if (value != null) {
localCache.put(key, value);
return value;
}
// 从数据库读取
value = readFromDatabase(key);
if (value != null) {
// 放入Redis并设置TTL
redisMap.put(key, value, 300, TimeUnit.SECONDS);
// 放入本地缓存
localCache.put(key, value);
}
return value;
} finally {
lock.unlock();
lockMap.remove(key);
}
}
// 更新数据
public void update(String key, Object value) {
// 使用分布式锁保证写操作的原子性
RLock lock = redissonClient.getLock("writeLock:" + key);
lock.lock();
try {
// 1. 更新数据库
boolean success = updateDatabase(key, value);
if (success) {
// 2. 先删除本地缓存
localCache.invalidate(key);
// 3. 删除Redis缓存
RMap<String, Object> redisMap = redissonClient.getMap("cache");
redisMap.remove(key);
// 4. 发布清除缓存的消息到集群
cacheClearTopic.publish(key);
// 5. 延迟双删
scheduledExecutorService.schedule(() -> {
redisMap.remove(key);
}, 100, TimeUnit.MILLISECONDS);
}
} finally {
lock.unlock();
}
}
// 从数据库读取数据(示例方法)
private Object readFromDatabase(String key) {
// 实际实现中会查询数据库
return "data_from_db_" + key;
}
// 更新数据库(示例方法)
private boolean updateDatabase(String key, Object value) {
// 实际实现中会更新数据库
return true;
}
}
|