Redis作为分布式缓存,性能是比较好的。但服务器会严重依赖Redis服务,且每次请求都会查询Redis获取缓存内容。
实现多级缓存时,有两个措施可以保证本地缓存和Redis缓存数据一致性:
- 主动措施:Redis实现订阅发布模型,即修改Redis内容后,集群所有服务器均要收到通知,来修改本地缓存。
- 被动措施:本地缓存设置失效时间,缓存失效后主动查询Redis获取数据。
1. Redis的订阅发布
加入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--guava依赖-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
订阅者配置:
Redis使用list结构实现订阅发布。订阅者可以在不同的项目,但是订阅者和发布者必须使用同一个Redis库。
@Slf4j
@Configuration
public class RedisContainerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//设置监听的队列,可以设置多个
container.addMessageListener(listenerAdapter, new PatternTopic("topic:service_xx:module"));
container.addMessageListener(listenerAdapter, new PatternTopic("topic:service_xx"));
return container;
}
//FuseInterceptor是一个普通的Bean对象,他的作用是处理监听到的内容
@Bean
MessageListenerAdapter listenerAdapter(FuseInterceptor receiver) {
return new MessageListenerAdapter(new MessageListener() {
public void onMessage(Message message, byte[] pattern) {
try {
//监听的队列信息
String type = new String(message.getChannel(), StandardCharsets.UTF_8.name());
//发送的内容信息
String m = new String(message.getBody(), StandardCharsets.UTF_8.name());
log.info("redis监听到[{}]的消息[{}]",type,m);
receiver.refreshCache(type, m);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
});
}
}
代码中使用了guava Cache实现本地缓存(并设置失效时间)。当监听到Redis修改时,主动修改本地缓存。缓存失效后,也会去修改本地缓存。
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@Component
@Slf4j
public class FuseInterceptor implements HandlerInterceptor {
@Autowired
private StringRedisTemplate stringRedisTemplate;
//设置超时时间。guava的Cache
public static Cache<String, Object> moduleCache = CacheBuilder.
newBuilder().
//写入缓存后,10s后过期
expireAfterWrite(10, TimeUnit.SECONDS).
build();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws ExecutionException {
//先在本地缓存中获取内容,若获取不到,直接查询Redis并放入本地缓存(线程安全)
String moduleStatus = (String) moduleCache.get("topic:service_xx:module", () -> {
//若本地缓存获取不到,那么去Redis查询数据并放入缓存。
String value = stringRedisTemplate.opsForValue().get("service_xx:module");
return StringUtils.isBlank(value) ? "close": value;
});
//拦截器处理。
return true;
}
/**
* 订阅发布获取到最新的Redis内容
*
* @param type 监听队列的名
* @param message 监听到的信息
*/
public void refreshCache(String type, String message) {
if ("topic:service_xx".equals(type)) {
//若是这个队列的消息,那么将其转换为Set<String>结构,并放入缓存中
Set<String> patterns = JSONObject.parseObject(message,
new TypeReference<LinkedHashSet<String>>() {
}.getType());
moduleCache.put(type, patterns);
} else {
moduleCache.put(type, message);
}
}
}
发布者配置:
//发布消息
StringRedisTemplate.convertAndSend("topic:service_xx:module","发送的消息");
//将消息存储到Redis中,以便本地缓存失效后查询Redis缓存
StringRedisTemplate.opsForValue().set("service_xx:module","发送的消息");
推荐阅读
SpringBoot2.x—SpringCache(5)使用多级缓存
SpringBoot2.x—SpringCache(6)缓存注意事项
网友评论