跳到主要内容

Redis缓存数据库详解

Redis(Remote Dictionary Server)是一个开源的内存数据结构存储系统,可以用作数据库、缓存和消息中间件。它支持多种数据结构,具有高性能、高可用性和丰富的功能特性,是现代Web应用架构中不可或缺的组件。

核心价值

Redis = 高性能内存存储 + 丰富数据结构 + 持久化机制 + 高可用架构

  • 🚀 极致性能:基于内存操作,单线程模型,QPS可达10万+
  • 🎯 数据结构丰富:String、Hash、List、Set、Sorted Set、Stream等
  • 💾 持久化保障:RDB快照 + AOF日志 + 混合持久化
  • 🔄 高可用架构:主从复制 + 哨兵模式 + 集群分片
  • 🛠️ 应用场景广泛:缓存、会话、排行榜、分布式锁、消息队列

1. Redis基础架构与特性

1.1 Redis核心特性

Redis作为内存数据库,具有以下核心特性:

1.2 Redis应用场景对比

应用场景数据结构典型用例性能特点适用规模
缓存系统String/Hash用户信息、商品详情、页面缓存读写极快,支持过期中大型应用
会话存储String/Hash用户登录状态、购物车高并发读写所有Web应用
计数统计String/Hash访问量、点赞数、库存原子操作,实时性强高并发场景
排行榜Sorted Set游戏排行、热搜榜、评分系统自动排序,范围查询实时排名需求
消息队列List/Stream任务队列、事件通知阻塞操作,顺序保证异步处理场景
分布式锁String资源互斥、防重复提交原子操作,过期机制分布式系统
地理位置Geo附近的人、配送范围地理计算,距离查询LBS应用

2. Redis数据结构深度解析

2.1 String(字符串)- 最基础的数据类型

String是Redis最基本的数据类型,可以存储字符串、整数或浮点数,是其他数据结构的基础。

String基本操作
bash
1# 基本设置和获取
2SET user:1001 "John Doe"
3GET user:1001 # 返回: "John Doe"
4GETSET user:1001 "Jane Doe" # 设置新值并返回旧值
5DEL user:1001 # 删除键
6EXISTS user:1001 # 检查键是否存在
7
8# 批量操作 - 提高性能
9MSET user:1001 "John" user:1002 "Jane" user:1003 "Bob"
10MGET user:1001 user:1002 user:1003
11# 返回: ["John", "Jane", "Bob"]
12
13# 条件设置
14SET lock:resource1 "owner1" NX EX 30 # 不存在时设置,30秒过期
15SET config:timeout "5000" XX # 存在时才设置

String应用实践

Java中的String操作实践
java
1@Service
2public class RedisStringService {
3
4 @Autowired
5 private RedisTemplate<String, String> redisTemplate;
6
7 /**
8 * 缓存用户信息
9 */
10 public void cacheUserInfo(Long userId, User user) {
11 String key = "user:info:" + userId;
12 String userJson = JSON.toJSONString(user);
13
14 // 缓存1小时
15 redisTemplate.opsForValue().set(key, userJson, 1, TimeUnit.HOURS);
16 }
17
18 /**
19 * 获取用户信息
20 */
21 public User getUserInfo(Long userId) {
22 String key = "user:info:" + userId;
23 String userJson = redisTemplate.opsForValue().get(key);
24
25 return userJson != null ? JSON.parseObject(userJson, User.class) : null;
26 }
27
28 /**
29 * 文章阅读量统计
30 */
31 public Long incrementArticleViews(Long articleId) {
32 String key = "article:views:" + articleId;
33 return redisTemplate.opsForValue().increment(key);
34 }
35
36 /**
37 * 分布式锁实现
38 */
39 public boolean tryLock(String lockKey, String lockValue, long timeout) {
40 Boolean result = redisTemplate.opsForValue()
41 .setIfAbsent(lockKey, lockValue, timeout, TimeUnit.SECONDS);
42 return Boolean.TRUE.equals(result);
43 }
44
45 /**
46 * 限流器实现 - 滑动窗口
47 */
48 public boolean isAllowed(String key, int maxRequests, int windowSeconds) {
49 String script =
50 "local current = redis.call('incr', KEYS[1]) " +
51 "if tonumber(current) == 1 then " +
52 " redis.call('expire', KEYS[1], ARGV[1]) " +
53 "end " +
54 "return tonumber(current) <= tonumber(ARGV[2])";
55
56 Long result = redisTemplate.execute(
57 new DefaultRedisScript<>(script, Long.class),
58 Collections.singletonList(key),
59 String.valueOf(windowSeconds),
60 String.valueOf(maxRequests)
61 );
62
63 return Long.valueOf(1).equals(result);
64 }
65}

2.2 Hash(哈希)- 对象存储的最佳选择

Hash是一个键值对集合,特别适合存储对象数据,相比String存储JSON有更好的性能和灵活性。

Hash基本操作
bash
1# 单个字段操作
2HSET user:1001 name "John Doe"
3HSET user:1001 age 25
4HSET user:1001 email "john@example.com"
5
6HGET user:1001 name # 返回: "John Doe"
7HGETALL user:1001 # 返回所有字段和值
8HDEL user:1001 age # 删除age字段
9HEXISTS user:1001 name # 检查字段是否存在
10
11# 批量操作
12HMSET user:1002 name "Jane" age 30 email "jane@example.com" city "Beijing"
13HMGET user:1002 name age email # 批量获取指定字段

Hash vs String 性能对比

对比维度HashString (JSON)优势分析
内存使用更节省较多Hash避免了JSON序列化开销
部分更新支持需要全量更新Hash可以只更新单个字段
查询性能单字段快全量解析Hash支持字段级别的操作
数据类型原生支持字符串Hash支持数值操作
复杂查询有限灵活JSON支持复杂嵌套结构

2.3 List(列表)- 有序数据的理想选择

List是一个双向链表,支持从两端添加或删除元素,适合实现队列、栈和时间线等场景。

List基本操作
bash
1# 添加元素
2LPUSH mylist "item1" # 左端添加
3RPUSH mylist "item2" "item3" # 右端添加多个元素
4LINSERT mylist BEFORE "item2" "new_item" # 在指定元素前插入
5
6# 获取元素
7LRANGE mylist 0 -1 # 获取所有元素
8LINDEX mylist 0 # 获取指定位置的元素
9LLEN mylist # 获取列表长度
10
11# 删除元素
12LPOP mylist # 左端弹出
13RPOP mylist # 右端弹出
14LREM mylist 1 "item1" # 删除指定数量的元素
15LTRIM mylist 0 99 # 保留指定范围的元素

2.4 Set(集合)- 去重和集合运算

Set是一个无序的字符串集合,不允许重复元素,支持集合间的交集、并集、差集运算。

Set基本操作
bash
1# 添加和删除
2SADD myset "member1" "member2" "member3"
3SREM myset "member1" # 删除成员
4SMEMBERS myset # 获取所有成员
5SCARD myset # 获取成员数量
6SISMEMBER myset "member2" # 检查成员是否存在
7
8# 随机操作
9SRANDMEMBER myset 2 # 随机获取2个成员
10SPOP myset # 随机弹出并删除一个成员

2.5 Sorted Set(有序集合)- 排序和排名

Sorted Set是一个有序的字符串集合,每个成员都有一个分数,按分数排序,适合实现排行榜、优先级队列等。

Sorted Set基本操作
bash
1# 添加成员
2ZADD leaderboard 1000 "player1"
3ZADD leaderboard 1200 "player2" 800 "player3"
4
5# 获取信息
6ZSCORE leaderboard "player1" # 获取分数: 1000
7ZRANK leaderboard "player1" # 获取排名(升序): 1
8ZREVRANK leaderboard "player1" # 获取排名(降序): 1
9ZCARD leaderboard # 获取成员总数
10
11# 范围查询
12ZRANGE leaderboard 0 -1 # 获取所有成员(升序)
13ZREVRANGE leaderboard 0 -1 # 获取所有成员(降序)
14ZRANGE leaderboard 0 -1 WITHSCORES # 包含分数

2.6 Stream(流)- 现代消息队列解决方案

Stream是Redis 5.0引入的新数据类型,专门用于构建消息队列和事件流处理系统,支持消费者组、消息确认等高级特性。

Stream基本操作
bash
1# 添加消息
2XADD user_events * user_id 1001 action "login" timestamp 1640995200
3XADD user_events * user_id 1002 action "logout" timestamp 1640995300
4XADD user_events 1640995400000-0 user_id 1003 action "purchase" amount 99.99
5
6# 读取消息
7XREAD COUNT 10 STREAMS user_events 0 # 从开始读取10条消息
8XREAD BLOCK 5000 STREAMS user_events $ # 阻塞5秒等待新消息
9XREAD STREAMS user_events order_events 0-0 0-0 # 从多个流读取
10
11# 范围查询
12XRANGE user_events - + # 获取所有消息
13XRANGE user_events 1640995200000 1640995400000 # 时间范围查询
14XREVRANGE user_events + - COUNT 5 # 反向获取最新5条消息
15
16# 流信息
17XLEN user_events # 获取消息数量
18XINFO STREAM user_events # 获取流详细信息

Stream vs 传统消息队列对比

特性Redis StreamRabbitMQKafka适用场景
消息持久化支持支持支持所有场景
消费者组支持支持支持多消费者场景
消息确认支持支持支持可靠性要求高
消息回溯支持有限支持需要重新处理历史消息
性能极高极高高并发场景
运维复杂度简单部署需求
生态系统Redis生态丰富丰富已有Redis基础设施

3. Redis持久化机制详解

Redis作为内存数据库,提供了多种持久化机制来保证数据的安全性和可恢复性。

3.1 RDB持久化 - 快照备份

RDB(Redis Database)通过创建数据快照来实现持久化,是Redis的默认持久化方式。

RDB配置详解
bash
1# redis.conf 配置
2save 900 1 # 900秒内至少1个key变化时保存
3save 300 10 # 300秒内至少10个key变化时保存
4save 60 10000 # 60秒内至少10000个key变化时保存
5
6# 其他RDB配置
7stop-writes-on-bgsave-error yes # RDB保存失败时停止写入
8rdbcompression yes # 启用RDB文件压缩
9rdbchecksum yes # 启用RDB文件校验
10dbfilename dump.rdb # RDB文件名
11dir /var/lib/redis # RDB文件保存目录
12
13# 手动触发RDB
14SAVE # 同步保存(阻塞)
15BGSAVE # 异步保存(非阻塞)
16LASTSAVE # 获取最后保存时间

3.2 AOF持久化 - 操作日志

AOF(Append Only File)通过记录每个写操作命令来实现持久化,提供更好的数据安全性。

AOF配置详解
bash
1# redis.conf 配置
2appendonly yes # 启用AOF持久化
3appendfilename "appendonly.aof" # AOF文件名
4appendfsync everysec # 同步策略
5
6# 同步策略选项
7# appendfsync always # 每个写命令都同步到磁盘(最安全,性能最低)
8# appendfsync everysec # 每秒同步一次(平衡安全性和性能)
9# appendfsync no # 由操作系统决定何时同步(性能最高,安全性最低)
10
11# AOF重写配置
12auto-aof-rewrite-percentage 100 # 当AOF文件大小比上次重写后增长100%时触发重写
13auto-aof-rewrite-min-size 64mb # AOF文件最小64MB时才考虑重写
14aof-load-truncated yes # 启动时加载被截断的AOF文件
15aof-rewrite-incremental-fsync yes # 重写时增量同步
16
17# 手动触发AOF重写
18BGREWRITEAOF # 后台重写AOF文件

3.3 混合持久化 - 最佳实践

Redis 4.0引入混合持久化,结合RDB和AOF的优点,是目前推荐的持久化方案。

混合持久化配置
bash
1# redis.conf 配置
2appendonly yes # 启用AOF
3aof-use-rdb-preamble yes # 启用混合持久化
4
5# 工作原理
6# 1. AOF重写时,将当前数据库状态以RDB格式写入AOF文件开头
7# 2. 重写后的新写操作以AOF格式追加到文件末尾
8# 3. 恢复时先加载RDB部分,再重放AOF部分
9
10# 文件结构示例
11# +-------+-------+-------+
12# | RDB | AOF | AOF |
13# | 基础 | 增量1 | 增量2 |
14# | 数据 | 数据 | 数据 |
15# +-------+-------+-------+

4. Redis高可用架构设计

Redis高可用架构是保证服务稳定运行的关键,包括主从复制、哨兵模式和集群模式三种主要方案。

4.1 主从复制 - 读写分离基础

主从复制是Redis高可用的基础,通过数据同步实现读写分离和数据备份。

主从复制配置
bash
1# 主节点配置 (redis-master.conf)
2bind 0.0.0.0
3port 6379
4daemonize yes
5pidfile /var/run/redis_6379.pid
6logfile /var/log/redis_6379.log
7dir /var/lib/redis
8
9# 安全配置
10requirepass master_password
11masterauth master_password
12
13# 从节点配置 (redis-slave.conf)
14bind 0.0.0.0
15port 6380
16daemonize yes
17pidfile /var/run/redis_6380.pid
18logfile /var/log/redis_6380.log
19dir /var/lib/redis
20
21# 主从关系配置
22replicaof 192.168.1.100 6379 # 指定主节点
23masterauth master_password # 主节点密码
24replica-read-only yes # 从节点只读
25replica-serve-stale-data yes # 连接断开时继续服务
26
27# 动态配置主从关系
28REPLICAOF 192.168.1.100 6379 # 设置为从节点
29REPLICAOF NO ONE # 取消从节点身份,升级为主节点

4.2 哨兵模式 - 自动故障转移

Redis Sentinel是Redis的高可用解决方案,提供监控、通知、自动故障转移和配置提供者功能。

哨兵配置详解
bash
1# sentinel.conf 配置文件
2port 26379 # 哨兵端口
3daemonize yes # 后台运行
4pidfile /var/run/redis-sentinel.pid # PID文件
5logfile /var/log/redis-sentinel.log # 日志文件
6dir /var/lib/redis # 工作目录
7
8# 监控主节点配置
9sentinel monitor mymaster 192.168.1.100 6379 2
10# mymaster: 主节点名称
11# 192.168.1.100 6379: 主节点地址和端口
12# 2: 判断主节点下线需要的哨兵数量(quorum)
13
14# 认证配置
15sentinel auth-pass mymaster master_password
16
17# 故障转移配置
18sentinel down-after-milliseconds mymaster 5000 # 5秒无响应判定下线
19sentinel failover-timeout mymaster 15000 # 故障转移超时时间
20sentinel parallel-syncs mymaster 1 # 同时同步的从节点数量
21
22# 通知脚本配置
23sentinel notification-script mymaster /opt/scripts/notify.sh
24sentinel client-reconfig-script mymaster /opt/scripts/reconfig.sh
25
26# 启动哨兵
27redis-sentinel /etc/redis/sentinel.conf
28# 或者
29redis-server /etc/redis/sentinel.conf --sentinel

4.3 集群模式 - 水平扩展方案

Redis Cluster是Redis的分布式解决方案,支持数据分片、自动故障转移和水平扩展。

Redis集群搭建
bash
1# 1. 节点配置文件 (redis-7000.conf)
2port 7000
3cluster-enabled yes # 启用集群模式
4cluster-config-file nodes-7000.conf # 集群配置文件
5cluster-node-timeout 5000 # 节点超时时间
6cluster-announce-ip 192.168.1.100 # 节点公告IP
7cluster-announce-port 7000 # 节点公告端口
8cluster-announce-bus-port 17000 # 集群总线端口
9
10# 持久化配置
11appendonly yes
12appendfilename "appendonly-7000.aof"
13
14# 其他配置
15daemonize yes
16pidfile /var/run/redis_7000.pid
17logfile /var/log/redis_7000.log
18dir /var/lib/redis/7000
19
20# 2. 启动所有节点
21redis-server /etc/redis/redis-7000.conf
22redis-server /etc/redis/redis-7001.conf
23redis-server /etc/redis/redis-7002.conf
24redis-server /etc/redis/redis-7003.conf
25redis-server /etc/redis/redis-7004.conf
26redis-server /etc/redis/redis-7005.conf
27
28# 3. 创建集群
29redis-cli --cluster create \
30 192.168.1.100:7000 192.168.1.100:7001 192.168.1.100:7002 \
31 192.168.1.100:7003 192.168.1.100:7004 192.168.1.100:7005 \
32 --cluster-replicas 1
33
34# 4. 验证集群状态
35redis-cli -c -p 7000 cluster nodes
36redis-cli -c -p 7000 cluster info

4.4 高可用方案对比与选择

方案数据一致性可用性扩展性复杂度适用场景
主从复制最终一致中等读扩展读多写少,简单架构
哨兵模式最终一致读扩展中等高可用要求,自动故障转移
集群模式最终一致读写扩展大数据量,高并发,水平扩展

5. Redis性能优化实战

Redis性能优化是一个系统工程,涉及内存、网络、持久化、数据结构等多个方面。

5.1 内存优化策略

内存是Redis最宝贵的资源,合理的内存使用策略直接影响系统性能。

Redis内存配置优化
bash
1# 基础内存配置
2maxmemory 8gb # 设置最大内存限制
3maxmemory-policy allkeys-lru # 内存淘汰策略
4
5# 内存淘汰策略详解
6# noeviction:不淘汰,内存满时写入报错(默认)
7# allkeys-lru:从所有key中使用LRU算法淘汰
8# volatile-lru:从设置了过期时间的key中使用LRU算法淘汰
9# allkeys-random:从所有key中随机淘汰
10# volatile-random:从设置了过期时间的key中随机淘汰
11# volatile-ttl:淘汰即将过期的key
12# allkeys-lfu:从所有key中使用LFU算法淘汰(Redis 4.0+)
13# volatile-lfu:从设置了过期时间的key中使用LFU算法淘汰(Redis 4.0+)
14
15# LRU配置优化
16maxmemory-samples 5 # LRU采样数量,越大越精确但性能越低
17
18# 内存使用监控
19redis-cli INFO memory
20# used_memory:8589934592 # 已使用内存(字节)
21# used_memory_human:8.00G # 已使用内存(人类可读)
22# used_memory_rss:9663676416 # 系统分配内存
23# used_memory_peak:8589934592 # 内存使用峰值
24# used_memory_peak_human:8.00G # 内存使用峰值(人类可读)
25# maxmemory:8589934592 # 最大内存限制
26# maxmemory_human:8.00G # 最大内存限制(人类可读)
27# maxmemory_policy:allkeys-lru # 内存淘汰策略

5.2 网络与连接优化

网络和连接配置直接影响Redis的响应时间和并发处理能力。

Redis网络配置优化
bash
1# 基础网络配置
2bind 0.0.0.0 # 绑定所有网络接口
3port 6379 # 监听端口
4tcp-backlog 511 # TCP监听队列长度
5timeout 0 # 客户端空闲超时时间(0表示不超时)
6
7# TCP配置优化
8tcp-keepalive 300 # TCP keepalive时间(秒)
9# 建议设置为300秒,可以及时发现断开的连接
10
11# 客户端连接配置
12maxclients 10000 # 最大客户端连接数
13# 默认10000,根据服务器性能和内存调整
14
15# 输出缓冲区配置
16client-output-buffer-limit normal 0 0 0 # 普通客户端无限制
17client-output-buffer-limit replica 256mb 64mb 60 # 从节点客户端
18client-output-buffer-limit pubsub 32mb 8mb 60 # 发布订阅客户端
19
20# 网络性能监控
21redis-cli INFO clients
22# connected_clients:100 # 当前连接的客户端数量
23# client_recent_max_input_buffer:2 # 最近客户端最大输入缓冲区
24# client_recent_max_output_buffer:0 # 最近客户端最大输出缓冲区
25# blocked_clients:0 # 被阻塞的客户端数量
26
27redis-cli INFO stats
28# total_connections_received:1000 # 总连接数
29# total_commands_processed:50000 # 总命令数
30# instantaneous_ops_per_sec:100 # 当前每秒操作数
31# instantaneous_input_kbps:10.5 # 当前输入速率(KB/s)
32# instantaneous_output_kbps:15.2 # 当前输出速率(KB/s)

5.3 持久化性能优化

持久化配置直接影响Redis的写入性能和数据安全性,需要根据业务需求进行平衡。

RDB持久化性能优化
bash
1# RDB触发条件优化
2# 根据业务特点调整save配置
3save 900 1 # 15分钟内至少1个key变化
4save 300 10 # 5分钟内至少10个key变化
5save 60 10000 # 1分钟内至少10000个key变化
6
7# 高写入场景的配置
8save 3600 1 # 1小时内至少1个key变化
9save 1800 100 # 30分钟内至少100个key变化
10save 300 10000 # 5分钟内至少10000个key变化
11
12# 低写入场景的配置
13save 300 1 # 5分钟内至少1个key变化
14save 60 10 # 1分钟内至少10个key变化
15save 10 1000 # 10秒内至少1000个key变化
16
17# RDB性能配置
18stop-writes-on-bgsave-error yes # BGSAVE失败时停止写入
19rdbcompression yes # 启用RDB压缩(CPU换存储)
20rdbchecksum yes # 启用RDB校验(安全性)
21rdb-save-incremental-fsync yes # 增量fsync(减少IO阻塞)
22
23# 监控RDB性能
24redis-cli LASTSAVE # 最后一次保存时间
25redis-cli INFO persistence
26# rdb_changes_since_last_save:1000 # 上次保存后的变化数
27# rdb_bgsave_in_progress:0 # 是否正在进行BGSAVE
28# rdb_last_save_time:1640995200 # 最后保存时间戳
29# rdb_last_bgsave_status:ok # 最后BGSAVE状态
30# rdb_last_bgsave_time_sec:2 # 最后BGSAVE耗时

6. Redis实战应用场景

Redis在实际项目中有着广泛的应用场景,以下是一些典型的实战案例。

6.1 缓存系统设计

缓存是Redis最常见的应用场景,合理的缓存策略可以显著提升系统性能。

缓存模式实现
java
1@Service
2public class CacheService {
3
4 @Autowired
5 private RedisTemplate<String, Object> redisTemplate;
6
7 @Autowired
8 private UserRepository userRepository;
9
10 /**
11 * Cache-Aside模式(旁路缓存)
12 * 应用程序直接管理缓存
13 */
14 public User getUserCacheAside(Long userId) {
15 String cacheKey = "user:cache:" + userId;
16
17 // 1. 先查缓存
18 User user = (User) redisTemplate.opsForValue().get(cacheKey);
19 if (user != null) {
20 return user;
21 }
22
23 // 2. 缓存未命中,查数据库
24 user = userRepository.findById(userId).orElse(null);
25 if (user != null) {
26 // 3. 写入缓存,设置过期时间
27 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);
28 }
29
30 return user;
31 }
32
33 /**
34 * Write-Through模式(写穿透)
35 * 写操作同时更新缓存和数据库
36 */
37 public void updateUserWriteThrough(User user) {
38 String cacheKey = "user:cache:" + user.getId();
39
40 // 1. 更新数据库
41 userRepository.save(user);
42
43 // 2. 更新缓存
44 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);
45 }
46
47 /**
48 * Write-Behind模式(写回)
49 * 先更新缓存,异步更新数据库
50 */
51 public void updateUserWriteBehind(User user) {
52 String cacheKey = "user:cache:" + user.getId();
53
54 // 1. 立即更新缓存
55 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);
56
57 // 2. 异步更新数据库
58 CompletableFuture.runAsync(() -> {
59 try {
60 Thread.sleep(100); // 模拟延迟
61 userRepository.save(user);
62 } catch (Exception e) {
63 // 处理异常,可能需要回滚缓存
64 redisTemplate.delete(cacheKey);
65 throw new RuntimeException("数据库更新失败", e);
66 }
67 });
68 }
69
70 /**
71 * 缓存预热
72 */
73 @PostConstruct
74 public void warmUpCache() {
75 // 预加载热点数据
76 List<User> hotUsers = userRepository.findHotUsers();
77 for (User user : hotUsers) {
78 String cacheKey = "user:cache:" + user.getId();
79 redisTemplate.opsForValue().set(cacheKey, user, 2, TimeUnit.HOURS);
80 }
81 }
82
83 /**
84 * 缓存穿透防护 - 布隆过滤器
85 */
86 public User getUserWithBloomFilter(Long userId) {
87 // 1. 布隆过滤器检查
88 if (!bloomFilterContains("user:bloom", userId.toString())) {
89 return null; // 确定不存在
90 }
91
92 // 2. 查缓存
93 String cacheKey = "user:cache:" + userId;
94 User user = (User) redisTemplate.opsForValue().get(cacheKey);
95 if (user != null) {
96 return user;
97 }
98
99 // 3. 查数据库
100 user = userRepository.findById(userId).orElse(null);
101 if (user != null) {
102 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);
103 } else {
104 // 缓存空值,防止缓存穿透
105 redisTemplate.opsForValue().set(cacheKey, "NULL", 5, TimeUnit.MINUTES);
106 }
107
108 return user;
109 }
110
111 /**
112 * 缓存击穿防护 - 分布式锁
113 */
114 public User getUserWithLock(Long userId) {
115 String cacheKey = "user:cache:" + userId;
116 String lockKey = "user:lock:" + userId;
117
118 // 1. 查缓存
119 User user = (User) redisTemplate.opsForValue().get(cacheKey);
120 if (user != null) {
121 return user;
122 }
123
124 // 2. 获取分布式锁
125 String lockValue = UUID.randomUUID().toString();
126 Boolean lockAcquired = redisTemplate.opsForValue()
127 .setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
128
129 if (Boolean.TRUE.equals(lockAcquired)) {
130 try {
131 // 3. 双重检查
132 user = (User) redisTemplate.opsForValue().get(cacheKey);
133 if (user != null) {
134 return user;
135 }
136
137 // 4. 查数据库
138 user = userRepository.findById(userId).orElse(null);
139 if (user != null) {
140 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);
141 }
142
143 return user;
144 } finally {
145 // 5. 释放锁
146 releaseLock(lockKey, lockValue);
147 }
148 } else {
149 // 等待其他线程加载数据
150 try {
151 Thread.sleep(100);
152 return getUserWithLock(userId); // 递归重试
153 } catch (InterruptedException e) {
154 Thread.currentThread().interrupt();
155 return null;
156 }
157 }
158 }
159
160 private boolean bloomFilterContains(String filterKey, String value) {
161 // 简化的布隆过滤器实现
162 // 实际项目中建议使用Redisson的布隆过滤器
163 return true; // 假设存在
164 }
165
166 private void releaseLock(String lockKey, String lockValue) {
167 String script =
168 "if redis.call('get', KEYS[1]) == ARGV[1] then " +
169 "return redis.call('del', KEYS[1]) " +
170 "else return 0 end";
171
172 redisTemplate.execute(
173 new DefaultRedisScript<>(script, Long.class),
174 Collections.singletonList(lockKey),
175 lockValue
176 );
177 }
178}

6.2 分布式锁实现

分布式锁是分布式系统中的重要组件,Redis提供了简单高效的实现方案。

Redis分布式锁实现
java
1@Component
2public class RedisDistributedLock {
3
4 @Autowired
5 private RedisTemplate<String, String> redisTemplate;
6
7 private static final String LOCK_PREFIX = "distributed:lock:";
8 private static final String UNLOCK_SCRIPT =
9 "if redis.call('get', KEYS[1]) == ARGV[1] then " +
10 "return redis.call('del', KEYS[1]) " +
11 "else return 0 end";
12
13 /**
14 * 尝试获取锁
15 */
16 public boolean tryLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit) {
17 String key = LOCK_PREFIX + lockKey;
18 Boolean result = redisTemplate.opsForValue()
19 .setIfAbsent(key, lockValue, expireTime, timeUnit);
20 return Boolean.TRUE.equals(result);
21 }
22
23 /**
24 * 释放锁
25 */
26 public boolean releaseLock(String lockKey, String lockValue) {
27 String key = LOCK_PREFIX + lockKey;
28 Long result = redisTemplate.execute(
29 new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class),
30 Collections.singletonList(key),
31 lockValue
32 );
33 return Long.valueOf(1).equals(result);
34 }
35
36 /**
37 * 可重入锁实现
38 */
39 private static final String REENTRANT_LOCK_SCRIPT =
40 "local key = KEYS[1] " +
41 "local value = ARGV[1] " +
42 "local expire = ARGV[2] " +
43 "local current = redis.call('HGET', key, 'owner') " +
44 "if current == false then " +
45 " redis.call('HSET', key, 'owner', value) " +
46 " redis.call('HSET', key, 'count', 1) " +
47 " redis.call('EXPIRE', key, expire) " +
48 " return 1 " +
49 "elseif current == value then " +
50 " local count = redis.call('HINCRBY', key, 'count', 1) " +
51 " redis.call('EXPIRE', key, expire) " +
52 " return 1 " +
53 "else " +
54 " return 0 " +
55 "end";
56
57 private static final String REENTRANT_UNLOCK_SCRIPT =
58 "local key = KEYS[1] " +
59 "local value = ARGV[1] " +
60 "local current = redis.call('HGET', key, 'owner') " +
61 "if current == value then " +
62 " local count = redis.call('HINCRBY', key, 'count', -1) " +
63 " if count == 0 then " +
64 " redis.call('DEL', key) " +
65 " return 1 " +
66 " else " +
67 " return 1 " +
68 " end " +
69 "else " +
70 " return 0 " +
71 "end";
72
73 /**
74 * 可重入锁 - 获取锁
75 */
76 public boolean tryReentrantLock(String lockKey, String lockValue, long expireTime) {
77 String key = LOCK_PREFIX + lockKey;
78 Long result = redisTemplate.execute(
79 new DefaultRedisScript<>(REENTRANT_LOCK_SCRIPT, Long.class),
80 Collections.singletonList(key),
81 lockValue,
82 String.valueOf(expireTime)
83 );
84 return Long.valueOf(1).equals(result);
85 }
86
87 /**
88 * 可重入锁 - 释放锁
89 */
90 public boolean releaseReentrantLock(String lockKey, String lockValue) {
91 String key = LOCK_PREFIX + lockKey;
92 Long result = redisTemplate.execute(
93 new DefaultRedisScript<>(REENTRANT_UNLOCK_SCRIPT, Long.class),
94 Collections.singletonList(key),
95 lockValue
96 );
97 return Long.valueOf(1).equals(result);
98 }
99}
100
101// 分布式锁注解
102@Component
103@Aspect
104public class DistributedLockAspect {
105
106 @Autowired
107 private RedisDistributedLock distributedLock;
108
109 @Around("@annotation(lockAnnotation)")
110 public Object around(ProceedingJoinPoint joinPoint, DistributedLock lockAnnotation) throws Throwable {
111 String lockKey = generateLockKey(joinPoint, lockAnnotation.key());
112 String lockValue = UUID.randomUUID().toString();
113
114 boolean acquired = distributedLock.tryLock(
115 lockKey,
116 lockValue,
117 lockAnnotation.expireTime(),
118 lockAnnotation.timeUnit()
119 );
120
121 if (!acquired) {
122 if (lockAnnotation.waitTime() > 0) {
123 // 等待获取锁
124 return waitAndRetry(joinPoint, lockAnnotation, lockKey, lockValue);
125 } else {
126 throw new RuntimeException("获取分布式锁失败: " + lockKey);
127 }
128 }
129
130 try {
131 return joinPoint.proceed();
132 } finally {
133 distributedLock.releaseLock(lockKey, lockValue);
134 }
135 }
136
137 private Object waitAndRetry(ProceedingJoinPoint joinPoint, DistributedLock lockAnnotation,
138 String lockKey, String lockValue) throws Throwable {
139 long waitTime = lockAnnotation.waitTime();
140 long startTime = System.currentTimeMillis();
141
142 while (System.currentTimeMillis() - startTime < waitTime) {
143 boolean acquired = distributedLock.tryLock(
144 lockKey,
145 lockValue,
146 lockAnnotation.expireTime(),
147 lockAnnotation.timeUnit()
148 );
149
150 if (acquired) {
151 try {
152 return joinPoint.proceed();
153 } finally {
154 distributedLock.releaseLock(lockKey, lockValue);
155 }
156 }
157
158 try {
159 Thread.sleep(100); // 等待100ms后重试
160 } catch (InterruptedException e) {
161 Thread.currentThread().interrupt();
162 throw new RuntimeException("等待锁被中断", e);
163 }
164 }
165
166 throw new RuntimeException("等待分布式锁超时: " + lockKey);
167 }
168
169 private String generateLockKey(ProceedingJoinPoint joinPoint, String keyExpression) {
170 if (keyExpression.isEmpty()) {
171 return joinPoint.getSignature().toShortString();
172 }
173
174 // 简化的SpEL表达式解析
175 Object[] args = joinPoint.getArgs();
176 return keyExpression.replace("#args[0]", String.valueOf(args[0]));
177 }
178}
179
180// 分布式锁注解定义
181@Target(ElementType.METHOD)
182@Retention(RetentionPolicy.RUNTIME)
183public @interface DistributedLock {
184 String key() default "";
185 long expireTime() default 30;
186 TimeUnit timeUnit() default TimeUnit.SECONDS;
187 long waitTime() default 0;
188}
189
190// 使用示例
191@Service
192public class OrderService {
193
194 @DistributedLock(key = "order:create:#args[0]", expireTime = 10, waitTime = 5000)
195 public void createOrder(String userId, OrderRequest request) {
196 // 创建订单的业务逻辑
197 // 同一用户同时只能创建一个订单
198 }
199
200 @DistributedLock(key = "inventory:reduce:#args[0]", expireTime = 30)
201 public void reduceInventory(String productId, int quantity) {
202 // 减库存的业务逻辑
203 // 同一商品的库存操作需要串行化
204 }
205}

6.3 限流器实现

限流是保护系统稳定性的重要手段,Redis提供了多种限流算法的实现方案。

Redis限流器实现
java
1@Component
2public class RedisRateLimiter {
3
4 @Autowired
5 private RedisTemplate<String, String> redisTemplate;
6
7 /**
8 * 固定窗口限流
9 */
10 private static final String FIXED_WINDOW_SCRIPT =
11 "local key = KEYS[1] " +
12 "local window = tonumber(ARGV[1]) " +
13 "local limit = tonumber(ARGV[2]) " +
14 "local current = redis.call('INCR', key) " +
15 "if current == 1 then " +
16 " redis.call('EXPIRE', key, window) " +
17 "end " +
18 "if current <= limit then " +
19 " return {1, current, limit - current} " +
20 "else " +
21 " return {0, current, 0} " +
22 "end";
23
24 public RateLimitResult fixedWindowLimit(String key, int windowSeconds, int limit) {
25 List<Long> result = redisTemplate.execute(
26 new DefaultRedisScript<>(FIXED_WINDOW_SCRIPT, List.class),
27 Collections.singletonList("rate_limit:fixed:" + key),
28 String.valueOf(windowSeconds),
29 String.valueOf(limit)
30 );
31
32 return new RateLimitResult(
33 result.get(0) == 1,
34 result.get(1).intValue(),
35 result.get(2).intValue()
36 );
37 }
38
39 /**
40 * 滑动窗口限流
41 */
42 private static final String SLIDING_WINDOW_SCRIPT =
43 "local key = KEYS[1] " +
44 "local window = tonumber(ARGV[1]) " +
45 "local limit = tonumber(ARGV[2]) " +
46 "local now = tonumber(ARGV[3]) " +
47 "local clearBefore = now - window * 1000 " +
48 "redis.call('ZREMRANGEBYSCORE', key, 0, clearBefore) " +
49 "local current = redis.call('ZCARD', key) " +
50 "if current < limit then " +
51 " redis.call('ZADD', key, now, now) " +
52 " redis.call('EXPIRE', key, window) " +
53 " return {1, current + 1, limit - current - 1} " +
54 "else " +
55 " return {0, current, 0} " +
56 "end";
57
58 public RateLimitResult slidingWindowLimit(String key, int windowSeconds, int limit) {
59 long now = System.currentTimeMillis();
60 List<Long> result = redisTemplate.execute(
61 new DefaultRedisScript<>(SLIDING_WINDOW_SCRIPT, List.class),
62 Collections.singletonList("rate_limit:sliding:" + key),
63 String.valueOf(windowSeconds),
64 String.valueOf(limit),
65 String.valueOf(now)
66 );
67
68 return new RateLimitResult(
69 result.get(0) == 1,
70 result.get(1).intValue(),
71 result.get(2).intValue()
72 );
73 }
74
75 /**
76 * 令牌桶限流
77 */
78 private static final String TOKEN_BUCKET_SCRIPT =
79 "local key = KEYS[1] " +
80 "local capacity = tonumber(ARGV[1]) " +
81 "local tokens = tonumber(ARGV[2]) " +
82 "local interval = tonumber(ARGV[3]) " +
83 "local requested = tonumber(ARGV[4]) " +
84 "local now = tonumber(ARGV[5]) " +
85
86 "local bucket = redis.call('HMGET', key, 'tokens', 'last_refill') " +
87 "local current_tokens = tonumber(bucket[1]) or capacity " +
88 "local last_refill = tonumber(bucket[2]) or now " +
89
90 "local elapsed = now - last_refill " +
91 "local tokens_to_add = math.floor(elapsed / interval * tokens) " +
92 "current_tokens = math.min(capacity, current_tokens + tokens_to_add) " +
93
94 "if current_tokens >= requested then " +
95 " current_tokens = current_tokens - requested " +
96 " redis.call('HMSET', key, 'tokens', current_tokens, 'last_refill', now) " +
97 " redis.call('EXPIRE', key, 3600) " +
98 " return {1, current_tokens, requested} " +
99 "else " +
100 " redis.call('HMSET', key, 'tokens', current_tokens, 'last_refill', now) " +
101 " redis.call('EXPIRE', key, 3600) " +
102 " return {0, current_tokens, 0} " +
103 "end";
104
105 public RateLimitResult tokenBucketLimit(String key, int capacity, int tokensPerSecond, int requested) {
106 long now = System.currentTimeMillis();
107 List<Long> result = redisTemplate.execute(
108 new DefaultRedisScript<>(TOKEN_BUCKET_SCRIPT, List.class),
109 Collections.singletonList("rate_limit:token:" + key),
110 String.valueOf(capacity),
111 String.valueOf(tokensPerSecond),
112 String.valueOf(1000), // 1秒的毫秒数
113 String.valueOf(requested),
114 String.valueOf(now)
115 );
116
117 return new RateLimitResult(
118 result.get(0) == 1,
119 result.get(1).intValue(),
120 result.get(2).intValue()
121 );
122 }
123
124 /**
125 * 漏桶限流
126 */
127 private static final String LEAKY_BUCKET_SCRIPT =
128 "local key = KEYS[1] " +
129 "local capacity = tonumber(ARGV[1]) " +
130 "local leak_rate = tonumber(ARGV[2]) " +
131 "local requested = tonumber(ARGV[3]) " +
132 "local now = tonumber(ARGV[4]) " +
133
134 "local bucket = redis.call('HMGET', key, 'volume', 'last_leak') " +
135 "local current_volume = tonumber(bucket[1]) or 0 " +
136 "local last_leak = tonumber(bucket[2]) or now " +
137
138 "local elapsed = now - last_leak " +
139 "local leaked = math.floor(elapsed / 1000 * leak_rate) " +
140 "current_volume = math.max(0, current_volume - leaked) " +
141
142 "if current_volume + requested <= capacity then " +
143 " current_volume = current_volume + requested " +
144 " redis.call('HMSET', key, 'volume', current_volume, 'last_leak', now) " +
145 " redis.call('EXPIRE', key, 3600) " +
146 " return {1, current_volume, capacity - current_volume} " +
147 "else " +
148 " redis.call('HMSET', key, 'volume', current_volume, 'last_leak', now) " +
149 " redis.call('EXPIRE', key, 3600) " +
150 " return {0, current_volume, capacity - current_volume} " +
151 "end";
152
153 public RateLimitResult leakyBucketLimit(String key, int capacity, int leakRate, int requested) {
154 long now = System.currentTimeMillis();
155 List<Long> result = redisTemplate.execute(
156 new DefaultRedisScript<>(LEAKY_BUCKET_SCRIPT, List.class),
157 Collections.singletonList("rate_limit:leaky:" + key),
158 String.valueOf(capacity),
159 String.valueOf(leakRate),
160 String.valueOf(requested),
161 String.valueOf(now)
162 );
163
164 return new RateLimitResult(
165 result.get(0) == 1,
166 result.get(1).intValue(),
167 result.get(2).intValue()
168 );
169 }
170
171 // 限流结果类
172 public static class RateLimitResult {
173 private final boolean allowed;
174 private final int current;
175 private final int remaining;
176
177 public RateLimitResult(boolean allowed, int current, int remaining) {
178 this.allowed = allowed;
179 this.current = current;
180 this.remaining = remaining;
181 }
182
183 // getters
184 public boolean isAllowed() { return allowed; }
185 public int getCurrent() { return current; }
186 public int getRemaining() { return remaining; }
187 }
188}
189
190// 限流注解实现
191@Component
192@Aspect
193public class RateLimitAspect {
194
195 @Autowired
196 private RedisRateLimiter rateLimiter;
197
198 @Around("@annotation(rateLimit)")
199 public Object around(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {
200 String key = generateKey(joinPoint, rateLimit);
201
202 RedisRateLimiter.RateLimitResult result;
203 switch (rateLimit.algorithm()) {
204 case FIXED_WINDOW:
205 result = rateLimiter.fixedWindowLimit(key, rateLimit.window(), rateLimit.limit());
206 break;
207 case SLIDING_WINDOW:
208 result = rateLimiter.slidingWindowLimit(key, rateLimit.window(), rateLimit.limit());
209 break;
210 case TOKEN_BUCKET:
211 result = rateLimiter.tokenBucketLimit(key, rateLimit.limit(), rateLimit.tokensPerSecond(), 1);
212 break;
213 case LEAKY_BUCKET:
214 result = rateLimiter.leakyBucketLimit(key, rateLimit.limit(), rateLimit.leakRate(), 1);
215 break;
216 default:
217 result = rateLimiter.fixedWindowLimit(key, rateLimit.window(), rateLimit.limit());
218 }
219
220 if (!result.isAllowed()) {
221 throw new RateLimitExceededException("请求频率过高,请稍后重试");
222 }
223
224 // 在响应头中添加限流信息
225 HttpServletResponse response = getCurrentResponse();
226 if (response != null) {
227 response.setHeader("X-RateLimit-Limit", String.valueOf(rateLimit.limit()));
228 response.setHeader("X-RateLimit-Remaining", String.valueOf(result.getRemaining()));
229 response.setHeader("X-RateLimit-Reset", String.valueOf(System.currentTimeMillis() + rateLimit.window() * 1000));
230 }
231
232 return joinPoint.proceed();
233 }
234
235 private String generateKey(ProceedingJoinPoint joinPoint, RateLimit rateLimit) {
236 String baseKey = rateLimit.key();
237 if (baseKey.isEmpty()) {
238 baseKey = joinPoint.getSignature().toShortString();
239 }
240
241 // 根据限流维度生成key
242 switch (rateLimit.dimension()) {
243 case IP:
244 return baseKey + ":" + getCurrentClientIp();
245 case USER:
246 return baseKey + ":" + getCurrentUserId();
247 case GLOBAL:
248 return baseKey;
249 default:
250 return baseKey;
251 }
252 }
253
254 private String getCurrentClientIp() {
255 HttpServletRequest request = getCurrentRequest();
256 if (request != null) {
257 String xForwardedFor = request.getHeader("X-Forwarded-For");
258 if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
259 return xForwardedFor.split(",")[0].trim();
260 }
261 return request.getRemoteAddr();
262 }
263 return "unknown";
264 }
265
266 private String getCurrentUserId() {
267 // 从安全上下文获取用户ID
268 return "user123"; // 简化实现
269 }
270
271 private HttpServletRequest getCurrentRequest() {
272 RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
273 if (requestAttributes instanceof ServletRequestAttributes) {
274 return ((ServletRequestAttributes) requestAttributes).getRequest();
275 }
276 return null;
277 }
278
279 private HttpServletResponse getCurrentResponse() {
280 RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
281 if (requestAttributes instanceof ServletRequestAttributes) {
282 return ((ServletRequestAttributes) requestAttributes).getResponse();
283 }
284 return null;
285 }
286}
287
288// 限流注解定义
289@Target(ElementType.METHOD)
290@Retention(RetentionPolicy.RUNTIME)
291public @interface RateLimit {
292 String key() default "";
293 int limit() default 100;
294 int window() default 60;
295 RateLimitAlgorithm algorithm() default RateLimitAlgorithm.FIXED_WINDOW;
296 RateLimitDimension dimension() default RateLimitDimension.IP;
297 int tokensPerSecond() default 10;
298 int leakRate() default 10;
299}
300
301// 限流算法枚举
302public enum RateLimitAlgorithm {
303 FIXED_WINDOW, // 固定窗口
304 SLIDING_WINDOW, // 滑动窗口
305 TOKEN_BUCKET, // 令牌桶
306 LEAKY_BUCKET // 漏桶
307}
308
309// 限流维度枚举
310public enum RateLimitDimension {
311 IP, // 按IP限流
312 USER, // 按用户限流
313 GLOBAL // 全局限流
314}
315
316// 限流异常
317public class RateLimitExceededException extends RuntimeException {
318 public RateLimitExceededException(String message) {
319 super(message);
320 }
321}
322
323// 使用示例
324@RestController
325public class ApiController {
326
327 @GetMapping("/api/data")
328 @RateLimit(key = "api:data", limit = 100, window = 60, algorithm = RateLimitAlgorithm.SLIDING_WINDOW)
329 public ResponseEntity<String> getData() {
330 return ResponseEntity.ok("data");
331 }
332
333 @PostMapping("/api/upload")
334 @RateLimit(key = "api:upload", limit = 10, window = 60, dimension = RateLimitDimension.USER)
335 public ResponseEntity<String> uploadFile() {
336 return ResponseEntity.ok("uploaded");
337 }
338
339 @GetMapping("/api/search")
340 @RateLimit(key = "api:search", limit = 1000, tokensPerSecond = 50, algorithm = RateLimitAlgorithm.TOKEN_BUCKET)
341 public ResponseEntity<String> search() {
342 return ResponseEntity.ok("search results");
343 }
344}

6.4 排行榜系统

基于Redis Sorted Set实现的排行榜系统,支持实时更新和多维度排序。

Redis排行榜系统
java
1@Service
2public class LeaderboardService {
3
4 @Autowired
5 private RedisTemplate<String, Object> redisTemplate;
6
7 private static final String LEADERBOARD_PREFIX = "leaderboard:";
8
9 /**
10 * 更新用户分数
11 */
12 public void updateScore(String leaderboardName, String userId, double score) {
13 String key = LEADERBOARD_PREFIX + leaderboardName;
14 redisTemplate.opsForZSet().add(key, userId, score);
15
16 // 设置过期时间(可选)
17 redisTemplate.expire(key, 7, TimeUnit.DAYS);
18 }
19
20 /**
21 * 增加用户分数
22 */
23 public Double incrementScore(String leaderboardName, String userId, double increment) {
24 String key = LEADERBOARD_PREFIX + leaderboardName;
25 return redisTemplate.opsForZSet().incrementScore(key, userId, increment);
26 }
27
28 /**
29 * 获取用户分数
30 */
31 public Double getUserScore(String leaderboardName, String userId) {
32 String key = LEADERBOARD_PREFIX + leaderboardName;
33 return redisTemplate.opsForZSet().score(key, userId);
34 }
35
36 /**
37 * 获取用户排名(从1开始)
38 */
39 public Long getUserRank(String leaderboardName, String userId) {
40 String key = LEADERBOARD_PREFIX + leaderboardName;
41 Long rank = redisTemplate.opsForZSet().reverseRank(key, userId);
42 return rank != null ? rank + 1 : null;
43 }
44
45 /**
46 * 获取排行榜前N名
47 */
48 public List<LeaderboardEntry> getTopN(String leaderboardName, int n) {
49 String key = LEADERBOARD_PREFIX + leaderboardName;
50 Set<ZSetOperations.TypedTuple<Object>> tuples =
51 redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, n - 1);
52
53 List<LeaderboardEntry> result = new ArrayList<>();
54 int rank = 1;
55 for (ZSetOperations.TypedTuple<Object> tuple : tuples) {
56 result.add(new LeaderboardEntry(
57 rank++,
58 (String) tuple.getValue(),
59 tuple.getScore()
60 ));
61 }
62
63 return result;
64 }
65
66 /**
67 * 获取指定排名范围的用户
68 */
69 public List<LeaderboardEntry> getRangeByRank(String leaderboardName, long start, long end) {
70 String key = LEADERBOARD_PREFIX + leaderboardName;
71 Set<ZSetOperations.TypedTuple<Object>> tuples =
72 redisTemplate.opsForZSet().reverseRangeWithScores(key, start - 1, end - 1);
73
74 List<LeaderboardEntry> result = new ArrayList<>();
75 long rank = start;
76 for (ZSetOperations.TypedTuple<Object> tuple : tuples) {
77 result.add(new LeaderboardEntry(
78 rank++,
79 (String) tuple.getValue(),
80 tuple.getScore()
81 ));
82 }
83
84 return result;
85 }
86
87 /**
88 * 获取指定分数范围的用户
89 */
90 public List<LeaderboardEntry> getRangeByScore(String leaderboardName, double minScore, double maxScore) {
91 String key = LEADERBOARD_PREFIX + leaderboardName;
92 Set<ZSetOperations.TypedTuple<Object>> tuples =
93 redisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, minScore, maxScore);
94
95 List<LeaderboardEntry> result = new ArrayList<>();
96 for (ZSetOperations.TypedTuple<Object> tuple : tuples) {
97 Long rank = redisTemplate.opsForZSet().reverseRank(key, tuple.getValue());
98 result.add(new LeaderboardEntry(
99 rank != null ? rank + 1 : 0,
100 (String) tuple.getValue(),
101 tuple.getScore()
102 ));
103 }
104
105 return result;
106 }
107
108 /**
109 * 获取用户周围的排名
110 */
111 public List<LeaderboardEntry> getUserNeighbors(String leaderboardName, String userId, int count) {
112 Long userRank = getUserRank(leaderboardName, userId);
113 if (userRank == null) {
114 return Collections.emptyList();
115 }
116
117 long start = Math.max(1, userRank - count / 2);
118 long end = start + count - 1;
119
120 return getRangeByRank(leaderboardName, start, end);
121 }
122
123 /**
124 * 删除用户
125 */
126 public void removeUser(String leaderboardName, String userId) {
127 String key = LEADERBOARD_PREFIX + leaderboardName;
128 redisTemplate.opsForZSet().remove(key, userId);
129 }
130
131 /**
132 * 获取排行榜总人数
133 */
134 public Long getTotalCount(String leaderboardName) {
135 String key = LEADERBOARD_PREFIX + leaderboardName;
136 return redisTemplate.opsForZSet().zCard(key);
137 }
138
139 /**
140 * 批量更新分数
141 */
142 public void batchUpdateScores(String leaderboardName, Map<String, Double> userScores) {
143 String key = LEADERBOARD_PREFIX + leaderboardName;
144
145 Set<ZSetOperations.TypedTuple<Object>> tuples = new HashSet<>();
146 for (Map.Entry<String, Double> entry : userScores.entrySet()) {
147 tuples.add(new DefaultTypedTuple<>(entry.getKey(), entry.getValue()));
148 }
149
150 redisTemplate.opsForZSet().add(key, tuples);
151 }
152
153 /**
154 * 多维度排行榜合并
155 */
156 public void mergeLeaderboards(String targetLeaderboard, List<String> sourceLeaderboards, List<Double> weights) {
157 String targetKey = LEADERBOARD_PREFIX + targetLeaderboard;
158
159 List<String> sourceKeys = sourceLeaderboards.stream()
160 .map(name -> LEADERBOARD_PREFIX + name)
161 .collect(Collectors.toList());
162
163 // 使用ZUNIONSTORE合并多个排行榜
164 ZSetOperations.Aggregate aggregate = ZSetOperations.Aggregate.SUM;
165 redisTemplate.opsForZSet().unionAndStore(sourceKeys.get(0), sourceKeys.subList(1, sourceKeys.size()), targetKey, aggregate, weights.toArray(new Double[0]));
166
167 // 设置过期时间
168 redisTemplate.expire(targetKey, 1, TimeUnit.HOURS);
169 }
170
171 // 排行榜条目类
172 public static class LeaderboardEntry {
173 private final long rank;
174 private final String userId;
175 private final double score;
176
177 public LeaderboardEntry(long rank, String userId, double score) {
178 this.rank = rank;
179 this.userId = userId;
180 this.score = score;
181 }
182
183 // getters
184 public long getRank() { return rank; }
185 public String getUserId() { return userId; }
186 public double getScore() { return score; }
187 }
188}
189
190// 排行榜管理器
191@Service
192public class LeaderboardManager {
193
194 @Autowired
195 private LeaderboardService leaderboardService;
196
197 /**
198 * 游戏排行榜示例
199 */
200 public void gameLeaderboardExample() {
201 String leaderboard = "game:global";
202
203 // 更新玩家分数
204 leaderboardService.updateScore(leaderboard, "player1", 1500);
205 leaderboardService.updateScore(leaderboard, "player2", 1200);
206 leaderboardService.updateScore(leaderboard, "player3", 1800);
207
208 // 获取前10名
209 List<LeaderboardService.LeaderboardEntry> top10 =
210 leaderboardService.getTopN(leaderboard, 10);
211
212 System.out.println("游戏排行榜前10名:");
213 for (LeaderboardService.LeaderboardEntry entry : top10) {
214 System.out.printf("第%d名: %s (分数: %.0f)\n",
215 entry.getRank(), entry.getUserId(), entry.getScore());
216 }
217 }
218
219 /**
220 * 实时活动排行榜
221 */
222 public void activityLeaderboardExample() {
223 String leaderboard = "activity:2025_spring";
224
225 // 用户完成任务,增加积分
226 leaderboardService.incrementScore(leaderboard, "user1", 100);
227 leaderboardService.incrementScore(leaderboard, "user2", 150);
228 leaderboardService.incrementScore(leaderboard, "user3", 80);
229
230 // 获取用户排名和周围用户
231 String userId = "user1";
232 Long userRank = leaderboardService.getUserRank(leaderboard, userId);
233 List<LeaderboardService.LeaderboardEntry> neighbors =
234 leaderboardService.getUserNeighbors(leaderboard, userId, 5);
235
236 System.out.println("用户" + userId + "当前排名: " + userRank);
237 System.out.println("周围用户排名:");
238 for (LeaderboardService.LeaderboardEntry entry : neighbors) {
239 System.out.printf("第%d名: %s (积分: %.0f)\n",
240 entry.getRank(), entry.getUserId(), entry.getScore());
241 }
242 }
243
244 /**
245 * 多维度排行榜示例
246 */
247 public void multiDimensionLeaderboardExample() {
248 // 创建不同维度的排行榜
249 String scoreBoard = "game:score";
250 String timeBoard = "game:time";
251 String comboBoard = "game:combo";
252
253 // 更新各维度数据
254 leaderboardService.updateScore(scoreBoard, "player1", 1500);
255 leaderboardService.updateScore(timeBoard, "player1", 120); // 用时120秒
256 leaderboardService.updateScore(comboBoard, "player1", 50); // 连击50次
257
258 // 合并排行榜(综合排名)
259 List<String> sourceBoards = Arrays.asList(scoreBoard, timeBoard, comboBoard);
260 List<Double> weights = Arrays.asList(0.5, -0.3, 0.2); // 分数权重0.5,时间权重-0.3(越少越好),连击权重0.2
261
262 leaderboardService.mergeLeaderboards("game:comprehensive", sourceBoards, weights);
263
264 // 获取综合排行榜
265 List<LeaderboardService.LeaderboardEntry> comprehensive =
266 leaderboardService.getTopN("game:comprehensive", 10);
267
268 System.out.println("综合排行榜:");
269 for (LeaderboardService.LeaderboardEntry entry : comprehensive) {
270 System.out.printf("第%d名: %s (综合分: %.2f)\n",
271 entry.getRank(), entry.getUserId(), entry.getScore());
272 }
273 }
274}

7. Redis面试题精选

7.1 基础概念题

Q1: Redis为什么这么快?

A: Redis高性能的原因包括:

  1. 内存操作:数据存储在内存中,避免磁盘I/O
  2. 单线程模型:避免线程切换和锁竞争开销
  3. 高效数据结构:针对不同场景优化的数据结构
  4. 非阻塞I/O:使用epoll等高效I/O多路复用
  5. 优化的网络协议:简单高效的RESP协议

Q2: Redis的数据类型有哪些?分别适用于什么场景?

A: Redis支持以下数据类型:

  • String:缓存、计数器、分布式锁
  • Hash:对象存储、购物车
  • List:消息队列、时间线、最新列表
  • Set:标签系统、好友关系、去重
  • Sorted Set:排行榜、优先级队列、范围查询
  • Stream:消息流、事件溯源

Q3: Redis持久化方式有哪些?各有什么优缺点?

A: Redis支持三种持久化方式:

方式优点缺点适用场景
RDB文件小、恢复快、性能影响小可能丢失数据、fork耗时备份、主从同步
AOF数据安全、可读性强文件大、恢复慢数据安全要求高
混合持久化兼顾性能和安全复杂度较高生产环境推荐

7.2 架构设计题

Q4: Redis如何实现高可用?

A: Redis高可用方案:

  1. 主从复制:数据备份和读写分离
  2. 哨兵模式:自动故障检测和转移
  3. 集群模式:数据分片和水平扩展
  4. 客户端容错:连接池、重试机制

Q5: Redis集群的数据分片原理是什么?

A: Redis集群使用一致性哈希的变种:

  • 16384个槽位(slot)
  • CRC16(key) % 16384 计算槽位
  • 每个主节点负责一部分槽位
  • 使用Hash Tag确保相关key在同一槽位

Q6: 如何解决Redis缓存穿透、击穿、雪崩问题?

A: 解决方案:

问题原因解决方案
缓存穿透查询不存在的数据布隆过滤器、缓存空值
缓存击穿热点key过期分布式锁、永不过期
缓存雪崩大量key同时过期过期时间随机化、多级缓存

7.3 性能优化题

Q7: 如何优化Redis性能?

A: 性能优化策略:

  1. 内存优化:合适的数据结构、内存淘汰策略
  2. 网络优化:连接池、Pipeline、批量操作
  3. 持久化优化:合理配置RDB和AOF参数
  4. 架构优化:读写分离、分片、多级缓存

Q8: Redis的内存淘汰策略有哪些?

A: Redis支持8种内存淘汰策略:

  • noeviction:不淘汰,写入报错
  • allkeys-lru/lfu:从所有key中淘汰
  • volatile-lru/lfu:从过期key中淘汰
  • allkeys-random:随机淘汰所有key
  • volatile-random:随机淘汰过期key
  • volatile-ttl:淘汰即将过期的key

7.4 实战应用题

Q9: 如何使用Redis实现分布式锁?需要注意什么问题?

A: 分布式锁实现要点:

  1. 原子性:使用SET NX EX命令
  2. 唯一性:使用UUID作为锁值
  3. 过期时间:防止死锁
  4. 安全释放:Lua脚本确保原子性
  5. 可重入性:使用Hash结构记录重入次数

Q10: 如何设计一个高性能的Redis缓存系统?

A: 设计要点:

  1. 缓存策略:Cache-Aside、Write-Through等
  2. 数据结构选择:根据场景选择合适的数据类型
  3. 过期策略:合理设置TTL,避免雪崩
  4. 监控告警:内存使用率、命中率、响应时间
  5. 容灾备份:主从复制、定期备份
Redis学习建议
  1. 理论基础:深入理解Redis的数据结构和实现原理
  2. 实践应用:多做项目实战,掌握常见应用场景
  3. 性能调优:学会分析性能瓶颈,掌握优化技巧
  4. 架构设计:了解高可用方案,能够设计分布式缓存架构
  5. 持续学习:关注Redis新版本特性,学习最佳实践

通过本章的深入学习,你应该已经全面掌握了Redis的核心概念、数据结构、持久化机制、高可用架构和性能优化技巧。Redis作为现代应用架构中的重要组件,在缓存、会话存储、消息队列、分布式锁等场景中发挥着关键作用。

在实际项目中,合理使用Redis不仅能显著提升系统性能,还能简化架构设计。希望这份详细的Redis指南能够帮助你在技术面试和实际工作中游刃有余!

评论