Redis缓存数据库详解
Redis(Remote Dictionary Server)是一个开源的内存数据结构存储系统,可以用作数据库、缓存和消息中间件。它支持多种数据结构,具有高性能、高可用性和丰富的功能特性,是现代Web应用架构中不可或缺的组件。
Redis = 高性能内存存储 + 丰富数据结构 + 持久化机制 + 高可用架构
- 🚀 极致性能:基于内存操作,单线程模型,QPS可达10万+
- 🎯 数据结构丰富:String、Hash、List、Set、Sorted Set、Stream等
- 💾 持久化保障:RDB快照 + AOF日志 + 混合持久化
- 🔄 高可用架构:主从复制 + 哨兵模式 + 集群分片
- 🛠️ 应用场景广泛:缓存、会话、排行榜、分布式锁、消息队列
1. Redis基础架构与特性
1.1 Redis核心特性
Redis作为内存数据库,具有以下核心特性:
1.2 Redis应用场景对比
| 应用场景 | 数据结构 | 典型用例 | 性能特点 | 适用规模 |
|---|---|---|---|---|
| 缓存系统 | String/Hash | 用户信息、商品详情、页面缓存 | 读写极快,支持过期 | 中大型应用 |
| 会话存储 | String/Hash | 用户登录状态、购物车 | 高并发读写 | 所有Web应用 |
| 计数统计 | String/Hash | 访问量、点赞数、库存 | 原子操作,实时性强 | 高并发场景 |
| 排行榜 | Sorted Set | 游戏排行、热搜榜、评分系统 | 自动排序,范围查询 | 实时排名需求 |
| 消息队列 | List/Stream | 任务队列、事件通知 | 阻塞操作,顺序保证 | 异步处理场景 |
| 分布式锁 | String | 资源互斥、防重复提交 | 原子操作,过期机制 | 分布式系统 |
| 地理位置 | Geo | 附近的人、配送范围 | 地理计算,距离查询 | LBS应用 |
2. Redis数据结构深度解析
2.1 String(字符串)- 最基础的数据类型
String是Redis最基本的数据类型,可以存储字符串、整数或浮点数,是其他数据结构的基础。
- 基本操作
- 数值操作
- 字符串操作
1# 基本设置和获取2SET user:1001 "John Doe"3GET user:1001 # 返回: "John Doe"4GETSET user:1001 "Jane Doe" # 设置新值并返回旧值5DEL user:1001 # 删除键6EXISTS user:1001 # 检查键是否存在78# 批量操作 - 提高性能9MSET user:1001 "John" user:1002 "Jane" user:1003 "Bob"10MGET user:1001 user:1002 user:100311# 返回: ["John", "Jane", "Bob"]1213# 条件设置14SET lock:resource1 "owner1" NX EX 30 # 不存在时设置,30秒过期15SET config:timeout "5000" XX # 存在时才设置1# 计数器操作2SET counter 03INCR counter # 递增1,返回: 14INCRBY counter 5 # 递增5,返回: 65DECR counter # 递减1,返回: 56DECRBY counter 3 # 递减3,返回: 278# 浮点数操作9SET price 99.9910INCRBYFLOAT price 0.01 # 返回: "100"11INCRBYFLOAT price -10.5 # 返回: "89.5"1213# 应用示例:文章阅读量统计14INCR article:1001:views # 每次访问递增15GET article:1001:views # 获取总阅读量1# 字符串拼接和截取2SET message "Hello"3APPEND message " World" # 返回: 11 (新长度)4GET message # 返回: "Hello World"5STRLEN message # 返回: 1167# 子字符串操作8GETRANGE message 0 4 # 返回: "Hello"9SETRANGE message 6 "Redis" # 替换部分字符串10GET message # 返回: "Hello Redis"1112# 位操作 - 适用于布尔标记13SETBIT user:1001:flags 0 1 # 设置第0位为114GETBIT user:1001:flags 0 # 获取第0位的值15BITCOUNT user:1001:flags # 统计1的个数String应用实践
1@Service2public class RedisStringService {3 4 @Autowired5 private RedisTemplate<String, String> redisTemplate;6 7 /**8 * 缓存用户信息9 */10 public void cacheUserInfo(Long userId, User user) {11 String key = "user:info:" + userId;12 String userJson = JSON.toJSONString(user);13 14 // 缓存1小时15 redisTemplate.opsForValue().set(key, userJson, 1, TimeUnit.HOURS);16 }17 18 /**19 * 获取用户信息20 */21 public User getUserInfo(Long userId) {22 String key = "user:info:" + userId;23 String userJson = redisTemplate.opsForValue().get(key);24 25 return userJson != null ? JSON.parseObject(userJson, User.class) : null;26 }27 28 /**29 * 文章阅读量统计30 */31 public Long incrementArticleViews(Long articleId) {32 String key = "article:views:" + articleId;33 return redisTemplate.opsForValue().increment(key);34 }35 36 /**37 * 分布式锁实现38 */39 public boolean tryLock(String lockKey, String lockValue, long timeout) {40 Boolean result = redisTemplate.opsForValue()41 .setIfAbsent(lockKey, lockValue, timeout, TimeUnit.SECONDS);42 return Boolean.TRUE.equals(result);43 }44 45 /**46 * 限流器实现 - 滑动窗口47 */48 public boolean isAllowed(String key, int maxRequests, int windowSeconds) {49 String script = 50 "local current = redis.call('incr', KEYS[1]) " +51 "if tonumber(current) == 1 then " +52 " redis.call('expire', KEYS[1], ARGV[1]) " +53 "end " +54 "return tonumber(current) <= tonumber(ARGV[2])";55 56 Long result = redisTemplate.execute(57 new DefaultRedisScript<>(script, Long.class),58 Collections.singletonList(key),59 String.valueOf(windowSeconds),60 String.valueOf(maxRequests)61 );62 63 return Long.valueOf(1).equals(result);64 }65}2.2 Hash(哈希)- 对象存储的最佳选择
Hash是一个键值对集合,特别适合存储对象数据,相比String存储JSON有更好的性能和灵活性。
- 基本操作
- 高级操作
- 应用场景
1# 单个字段操作2HSET user:1001 name "John Doe"3HSET user:1001 age 254HSET user:1001 email "john@example.com"56HGET user:1001 name # 返回: "John Doe"7HGETALL user:1001 # 返回所有字段和值8HDEL user:1001 age # 删除age字段9HEXISTS user:1001 name # 检查字段是否存在1011# 批量操作12HMSET user:1002 name "Jane" age 30 email "jane@example.com" city "Beijing"13HMGET user:1002 name age email # 批量获取指定字段1# 数值操作2HSET product:1001 price 99.993HSET product:1001 stock 1004HINCRBY product:1001 stock -1 # 库存减15HINCRBYFLOAT product:1001 price 0.01 # 价格增加0.0167# 字段信息获取8HKEYS user:1001 # 获取所有字段名9HVALS user:1001 # 获取所有字段值10HLEN user:1001 # 获取字段数量1112# 条件设置13HSETNX user:1001 phone "1234567890" # 字段不存在时才设置1# 1. 用户信息存储2HMSET user:1001 3 name "John Doe" 4 age 25 5 email "john@example.com" 6 phone "1234567890"7 created_at "2025-01-01"8 last_login "2025-08-11"910# 2. 购物车实现11HSET cart:user:1001 product:1001 2 # 商品1001,数量212HSET cart:user:1001 product:1002 1 # 商品1002,数量113HINCRBY cart:user:1001 product:1001 1 # 增加商品1001数量14HGETALL cart:user:1001 # 获取整个购物车1516# 3. 商品信息缓存17HMSET product:1001 18 name "iPhone 15" 19 price 5999 20 stock 100 21 category "electronics"22 brand "Apple"23 24# 4. 配置信息管理25HMSET config:app 26 name "MyApp" 27 version "1.0.0" 28 port 8080 29 debug true30 max_connections 1000Hash vs String 性能对比
| 对比维度 | Hash | String (JSON) | 优势分析 |
|---|---|---|---|
| 内存使用 | 更节省 | 较多 | Hash避免了JSON序列化开销 |
| 部分更新 | 支持 | 需要全量更新 | Hash可以只更新单个字段 |
| 查询性能 | 单字段快 | 全量解析 | Hash支持字段级别的操作 |
| 数据类型 | 原生支持 | 字符串 | Hash支持数值操作 |
| 复杂查询 | 有限 | 灵活 | JSON支持复杂嵌套结构 |
2.3 List(列表)- 有序数据的理想选择
List是一个双向链表,支持从两端添加或删除元素,适合实现队列、栈和时间线等场景。
- 基本操作
- 阻塞操作
- 应用场景
1# 添加元素2LPUSH mylist "item1" # 左端添加3RPUSH mylist "item2" "item3" # 右端添加多个元素4LINSERT mylist BEFORE "item2" "new_item" # 在指定元素前插入56# 获取元素7LRANGE mylist 0 -1 # 获取所有元素8LINDEX mylist 0 # 获取指定位置的元素9LLEN mylist # 获取列表长度1011# 删除元素12LPOP mylist # 左端弹出13RPOP mylist # 右端弹出14LREM mylist 1 "item1" # 删除指定数量的元素15LTRIM mylist 0 99 # 保留指定范围的元素1# 阻塞弹出 - 实现消息队列2BLPOP queue:tasks 10 # 阻塞10秒等待左端弹出3BRPOP queue:tasks 0 # 无限等待右端弹出4BRPOPLPUSH source dest 5 # 从source弹出并推入dest56# 生产者-消费者模式7# 生产者8LPUSH queue:emails "email1@example.com"9LPUSH queue:emails "email2@example.com"1011# 消费者12BRPOP queue:emails 0 # 阻塞等待邮件任务1# 1. 消息队列实现2LPUSH queue:notifications "user:1001:login"3LPUSH queue:notifications "order:2001:created"4BRPOP queue:notifications 0 # 消费者获取通知56# 2. 用户动态时间线7LPUSH user:1001:timeline "post:3001"8LPUSH user:1001:timeline "post:3002"9LRANGE user:1001:timeline 0 9 # 获取最新10条动态1011# 3. 最近访问记录12LPUSH user:1001:recent_pages "page:home"13LPUSH user:1001:recent_pages "page:product:1001"14LTRIM user:1001:recent_pages 0 19 # 只保留最近20条记录1516# 4. 任务队列17LPUSH tasks:high_priority "task:urgent:1001"18LPUSH tasks:normal "task:normal:2001"19BRPOP tasks:high_priority tasks:normal 0 # 优先处理高优先级任务2.4 Set(集合)- 去重和集合运算
Set是一个无序的字符串集合,不允许重复元素,支持集合间的交集、并集、差集运算。
- 基本操作
- 集合运算
- 应用场景
1# 添加和删除2SADD myset "member1" "member2" "member3"3SREM myset "member1" # 删除成员4SMEMBERS myset # 获取所有成员5SCARD myset # 获取成员数量6SISMEMBER myset "member2" # 检查成员是否存在78# 随机操作9SRANDMEMBER myset 2 # 随机获取2个成员10SPOP myset # 随机弹出并删除一个成员1# 创建测试集合2SADD set1 "a" "b" "c" "d"3SADD set2 "b" "c" "e" "f"4SADD set3 "c" "d" "g" "h"56# 交集运算7SINTER set1 set2 # 返回: ["b", "c"]8SINTERSTORE result set1 set2 # 交集结果存储到result910# 并集运算11SUNION set1 set2 # 返回: ["a", "b", "c", "d", "e", "f"]12SUNIONSTORE result set1 set2 set31314# 差集运算15SDIFF set1 set2 # 返回: ["a", "d"] (set1中有但set2中没有)16SDIFFSTORE result set1 set21# 1. 用户标签系统2SADD user:1001:tags "java" "redis" "mysql" "spring"3SADD user:1002:tags "python" "redis" "mongodb" "django"4SINTER user:1001:tags user:1002:tags # 共同技能标签56# 2. 好友关系7SADD user:1001:friends "user:1002" "user:1003" "user:1004"8SADD user:1002:friends "user:1001" "user:1005" "user:1006"9SINTER user:1001:friends user:1002:friends # 共同好友1011# 3. 文章点赞用户12SADD article:1001:likes "user:1001" "user:1002" "user:1003"13SCARD article:1001:likes # 点赞总数14SISMEMBER article:1001:likes "user:1001" # 检查用户是否点赞1516# 4. 在线用户统计17SADD online:users "user:1001" "user:1002"18SCARD online:users # 在线用户数19SREM online:users "user:1001" # 用户下线2021# 5. 抽奖系统22SADD lottery:participants "user:1001" "user:1002" "user:1003"23SPOP lottery:participants # 随机抽取获奖者2.5 Sorted Set(有序集合)- 排序和排名
Sorted Set是一个有序的字符串集合,每个成员都有一个分数,按分数排序,适合实现排行榜、优先级队列等。
- 基本操作
- 高级操作
- 应用场景
1# 添加成员2ZADD leaderboard 1000 "player1"3ZADD leaderboard 1200 "player2" 800 "player3"45# 获取信息6ZSCORE leaderboard "player1" # 获取分数: 10007ZRANK leaderboard "player1" # 获取排名(升序): 18ZREVRANK leaderboard "player1" # 获取排名(降序): 19ZCARD leaderboard # 获取成员总数1011# 范围查询12ZRANGE leaderboard 0 -1 # 获取所有成员(升序)13ZREVRANGE leaderboard 0 -1 # 获取所有成员(降序)14ZRANGE leaderboard 0 -1 WITHSCORES # 包含分数1# 分数范围查询2ZRANGEBYSCORE leaderboard 800 1200 # 分数在800-1200之间3ZREVRANGEBYSCORE leaderboard 1200 800 # 降序4ZCOUNT leaderboard 800 1200 # 统计范围内成员数56# 删除操作7ZREM leaderboard "player1" # 删除成员8ZREMRANGEBYRANK leaderboard 0 2 # 删除排名前3的成员9ZREMRANGEBYSCORE leaderboard 0 500 # 删除分数0-500的成员1011# 分数操作12ZINCRBY leaderboard 100 "player1" # 增加分数13ZINCRBY leaderboard -50 "player2" # 减少分数1415# 集合运算16ZUNIONSTORE result 2 set1 set2 WEIGHTS 1 2 # 并集,set2权重为217ZINTERSTORE result 2 set1 set2 AGGREGATE MAX # 交集,取最大分数1# 1. 游戏排行榜2ZADD game:leaderboard 15000 "player:1001"3ZADD game:leaderboard 12000 "player:1002"4ZADD game:leaderboard 18000 "player:1003"5ZREVRANGE game:leaderboard 0 9 WITHSCORES # 获取前10名67# 2. 热搜排行8ZADD trending:topics 1500 "Redis教程"9ZADD trending:topics 1200 "Java面试"10ZADD trending:topics 1800 "Spring Boot"11ZREVRANGE trending:topics 0 4 # 获取前5个热搜1213# 3. 时间线排序14ZADD user:1001:timeline 1640995200 "post:1001"15ZADD user:1001:timeline 1640995300 "post:1002"16ZREVRANGE user:1001:timeline 0 9 # 获取最新10条动态1718# 4. 优先级任务队列19ZADD tasks:queue 1 "low_priority_task"20ZADD tasks:queue 5 "high_priority_task"21ZADD tasks:queue 3 "medium_priority_task"22ZREVRANGE tasks:queue 0 0 # 获取最高优先级任务2324# 5. 商品销量排行25ZADD products:sales 500 "product:1001"26ZADD products:sales 800 "product:1002"27ZINCRBY products:sales 1 "product:1001" # 销量+128ZREVRANGE products:sales 0 9 WITHSCORES # 销量排行榜2.6 Stream(流)- 现代消息队列解决方案
Stream是Redis 5.0引入的新数据类型,专门用于构建消息队列和事件流处理系统,支持消费者组、消息确认等高级特性。
- 基本操作
- 消费者组
- 应用场景
1# 添加消息2XADD user_events * user_id 1001 action "login" timestamp 16409952003XADD user_events * user_id 1002 action "logout" timestamp 16409953004XADD user_events 1640995400000-0 user_id 1003 action "purchase" amount 99.9956# 读取消息7XREAD COUNT 10 STREAMS user_events 0 # 从开始读取10条消息8XREAD BLOCK 5000 STREAMS user_events $ # 阻塞5秒等待新消息9XREAD STREAMS user_events order_events 0-0 0-0 # 从多个流读取1011# 范围查询12XRANGE user_events - + # 获取所有消息13XRANGE user_events 1640995200000 1640995400000 # 时间范围查询14XREVRANGE user_events + - COUNT 5 # 反向获取最新5条消息1516# 流信息17XLEN user_events # 获取消息数量18XINFO STREAM user_events # 获取流详细信息1# 创建消费者组2XGROUP CREATE user_events analytics_group $ # 从最新消息开始3XGROUP CREATE user_events backup_group 0 # 从头开始消费45# 消费者读取消息6XREADGROUP GROUP analytics_group consumer1 COUNT 1 STREAMS user_events >7XREADGROUP GROUP analytics_group consumer2 COUNT 5 STREAMS user_events >89# 消息确认10XACK user_events analytics_group 1640995200000-01112# 查看待确认消息13XPENDING user_events analytics_group # 查看组的待确认消息14XPENDING user_events analytics_group - + 10 consumer1 # 查看特定消费者的待确认消息1516# 声明消息所有权(处理故障消费者的消息)17XCLAIM user_events analytics_group consumer2 3600000 1640995200000-01819# 删除消息20XDEL user_events 1640995200000-021XTRIM user_events MAXLEN 1000 # 保留最新1000条消息1# 1. 用户行为事件流2XADD user_behavior * user_id 1001 page "home" action "view" timestamp 16409952003XADD user_behavior * user_id 1001 page "product" action "click" product_id 20014XADD user_behavior * user_id 1001 action "purchase" order_id 3001 amount 299.9956# 消费者组处理不同业务7XGROUP CREATE user_behavior analytics_group $ # 数据分析组8XGROUP CREATE user_behavior recommendation_group $ # 推荐系统组910# 2. 订单状态变更流11XADD order_events * order_id 1001 status "created" user_id 200112XADD order_events * order_id 1001 status "paid" payment_id 300113XADD order_events * order_id 1001 status "shipped" tracking_no "SF123456"1415# 3. 系统日志流16XADD system_logs * level "ERROR" service "user-service" message "Database connection failed"17XADD system_logs * level "INFO" service "order-service" message "Order processed successfully"1819# 4. 实时通知流20XADD notifications * user_id 1001 type "order_shipped" title "您的订单已发货"21XADD notifications * user_id 1001 type "friend_request" from_user 2001Stream vs 传统消息队列对比
| 特性 | Redis Stream | RabbitMQ | Kafka | 适用场景 |
|---|---|---|---|---|
| 消息持久化 | 支持 | 支持 | 支持 | 所有场景 |
| 消费者组 | 支持 | 支持 | 支持 | 多消费者场景 |
| 消息确认 | 支持 | 支持 | 支持 | 可靠性要求高 |
| 消息回溯 | 支持 | 有限 | 支持 | 需要重新处理历史消息 |
| 性能 | 极高 | 高 | 极高 | 高并发场景 |
| 运维复杂度 | 低 | 中 | 高 | 简单部署需求 |
| 生态系统 | Redis生态 | 丰富 | 丰富 | 已有Redis基础设施 |
3. Redis持久化机制详解
Redis作为内存数据库,提供了多种持久化机制来保证数据的安全性和可恢复性。
3.1 RDB持久化 - 快照备份
RDB(Redis Database)通过创建数据快照来实现持久化,是Redis的默认持久化方式。
- RDB配置
- RDB过程
- 优缺点分析
1# redis.conf 配置2save 900 1 # 900秒内至少1个key变化时保存3save 300 10 # 300秒内至少10个key变化时保存 4save 60 10000 # 60秒内至少10000个key变化时保存56# 其他RDB配置7stop-writes-on-bgsave-error yes # RDB保存失败时停止写入8rdbcompression yes # 启用RDB文件压缩9rdbchecksum yes # 启用RDB文件校验10dbfilename dump.rdb # RDB文件名11dir /var/lib/redis # RDB文件保存目录1213# 手动触发RDB14SAVE # 同步保存(阻塞)15BGSAVE # 异步保存(非阻塞)16LASTSAVE # 获取最后保存时间1# 1. 触发条件检查2# - 定时检查save配置条件3# - 手动执行SAVE/BGSAVE命令4# - 服务器关闭时自动保存56# 2. Fork子进程(BGSAVE)7# - 主进程fork出子进程8# - 子进程继承主进程的内存快照9# - 主进程继续处理客户端请求1011# 3. 写入RDB文件12# - 子进程将内存数据写入临时RDB文件13# - 使用二进制格式,高度压缩14# - 包含数据库选择、键值对、过期时间等信息1516# 4. 原子替换17# - 写入完成后,原子性地替换旧RDB文件18# - 确保RDB文件的完整性1920# RDB文件结构示例21REDIS0009 # Redis版本和RDB版本22$6 # 数据库编号23redis-ver$5 # Redis版本信息24redis-bits$2 # 架构信息25ctime$10 # 创建时间26used-mem$8 # 使用内存27... # 键值对数据28$FF # 结束标记29$8 # 校验和1# ✅ RDB优点2# 1. 文件紧凑:高度压缩的二进制文件,适合备份和传输3# 2. 恢复速度快:直接加载到内存,启动速度快4# 3. 性能影响小:使用fork子进程,对主进程影响最小5# 4. 适合灾备:可以定期备份到远程存储67# ❌ RDB缺点 8# 1. 数据丢失风险:两次快照间的数据可能丢失9# 2. Fork开销:大数据集时fork可能耗时较长10# 3. 不适合实时:无法做到秒级的数据持久化11# 4. 版本兼容性:不同Redis版本的RDB格式可能不兼容1213# 适用场景14# - 对数据丢失容忍度较高的场景15# - 需要定期备份的场景 16# - 主从复制的全量同步17# - 快速重启恢复的场景3.2 AOF持久化 - 操作日志
AOF(Append Only File)通过记录每个写操作命令来实现持久化,提供更好的数据安全性。
- AOF配置
- AOF重写
- AOF格式
1# redis.conf 配置2appendonly yes # 启用AOF持久化3appendfilename "appendonly.aof" # AOF文件名4appendfsync everysec # 同步策略56# 同步策略选项7# appendfsync always # 每个写命令都同步到磁盘(最安全,性能最低)8# appendfsync everysec # 每秒同步一次(平衡安全性和性能)9# appendfsync no # 由操作系统决定何时同步(性能最高,安全性最低)1011# AOF重写配置12auto-aof-rewrite-percentage 100 # 当AOF文件大小比上次重写后增长100%时触发重写13auto-aof-rewrite-min-size 64mb # AOF文件最小64MB时才考虑重写14aof-load-truncated yes # 启动时加载被截断的AOF文件15aof-rewrite-incremental-fsync yes # 重写时增量同步1617# 手动触发AOF重写18BGREWRITEAOF # 后台重写AOF文件1# AOF重写前后对比2# 原始AOF文件内容:3SET counter 14INCR counter # counter = 25INCR counter # counter = 3 6INCR counter # counter = 47DEL counter8SET counter 1009EXPIRE counter 36001011# 重写后AOF文件内容:12SET counter 10013EXPIRE counter 36001415# AOF重写过程16# 1. Fork子进程17# - 主进程继续处理客户端请求18# - 子进程基于当前内存状态生成新AOF文件1920# 2. 重写缓冲区21# - 主进程将重写期间的新命令写入重写缓冲区22# - 确保重写过程中的数据不丢失2324# 3. 替换AOF文件25# - 子进程完成重写后,主进程将缓冲区内容追加到新AOF文件26# - 原子性地替换旧AOF文件2728# 4. 清理工作29# - 清空重写缓冲区30# - 更新AOF文件描述符1# AOF文件采用Redis协议格式2# 示例命令:SET mykey myvalue34*3 # 数组长度为35$3 # 第一个元素长度为36SET # 命令名7$5 # 第二个元素长度为5 8mykey # 键名9$7 # 第三个元素长度为710myvalue # 值1112# 复杂命令示例:HMSET user:1001 name "John" age 2513*6 # 数组长度为614$5 # HMSET15HMSET16$9 # user:100117user:1001 18$4 # name19name20$4 # "John"21John22$3 # age 23age24$2 # 2525252627# AOF文件特点28# - 纯文本格式,可读性强29# - 严格按照Redis协议格式30# - 支持手动编辑和修复31# - 文件大小通常比RDB大3.3 混合持久化 - 最佳实践
Redis 4.0引入混合持久化,结合RDB和AOF的优点,是目前推荐的持久化方案。
- 混合持久化配置
- 持久化方案对比
- 最佳实践
1# redis.conf 配置2appendonly yes # 启用AOF3aof-use-rdb-preamble yes # 启用混合持久化45# 工作原理6# 1. AOF重写时,将当前数据库状态以RDB格式写入AOF文件开头7# 2. 重写后的新写操作以AOF格式追加到文件末尾8# 3. 恢复时先加载RDB部分,再重放AOF部分910# 文件结构示例11# +-------+-------+-------+12# | RDB | AOF | AOF |13# | 基础 | 增量1 | 增量2 |14# | 数据 | 数据 | 数据 |15# +-------+-------+-------+1# 📊 性能对比2# RDB: ⭐⭐⭐⭐⭐ (性能最高)3# AOF: ⭐⭐⭐ (中等性能) 4# 混合持久化: ⭐⭐⭐⭐ (较高性能)56# 🛡️ 数据安全性对比7# RDB: ⭐⭐ (可能丢失较多数据)8# AOF: ⭐⭐⭐⭐⭐ (数据最安全)9# 混合持久化: ⭐⭐⭐⭐ (较安全)1011# 💾 文件大小对比12# RDB: ⭐⭐⭐⭐⭐ (文件最小)13# AOF: ⭐⭐ (文件较大)14# 混合持久化: ⭐⭐⭐⭐ (文件适中)1516# ⚡ 恢复速度对比 17# RDB: ⭐⭐⭐⭐⭐ (恢复最快)18# AOF: ⭐⭐ (恢复较慢)19# 混合持久化: ⭐⭐⭐⭐ (恢复较快)1# 🎯 推荐配置方案23# 方案一:高性能场景(可容忍少量数据丢失)4save 900 15save 300 10 6save 60 100007appendonly no89# 方案二:高可靠性场景(不能容忍数据丢失)10appendonly yes11appendfsync everysec12aof-use-rdb-preamble yes1314# 方案三:平衡方案(推荐)15save 900 116appendonly yes 17appendfsync everysec18aof-use-rdb-preamble yes19auto-aof-rewrite-percentage 10020auto-aof-rewrite-min-size 64mb2122# 🔧 运维最佳实践23# 1. 定期备份RDB文件到远程存储24# 2. 监控AOF文件大小,及时触发重写25# 3. 测试恢复流程,确保备份可用26# 4. 根据业务特点选择合适的持久化策略27# 5. 在从节点关闭持久化,减少主节点压力2829# 📈 性能优化建议30# 1. 将RDB和AOF文件放在不同磁盘上31# 2. 使用SSD存储提高I/O性能32# 3. 合理设置AOF重写阈值33# 4. 在业务低峰期执行BGSAVE和BGREWRITEAOF34# 5. 监控fork操作的耗时,避免阻塞主进程4. Redis高可用架构设计
Redis高可用架构是保证服务稳定运行的关键,包括主从复制、哨兵模式和集群模式三种主要方案。
4.1 主从复制 - 读写分离基础
主从复制是Redis高可用的基础,通过数据同步实现读写分离和数据备份。
- 主从配置
- 同步过程
- 复制优化
1# 主节点配置 (redis-master.conf)2bind 0.0.0.03port 63794daemonize yes5pidfile /var/run/redis_6379.pid6logfile /var/log/redis_6379.log7dir /var/lib/redis89# 安全配置10requirepass master_password11masterauth master_password1213# 从节点配置 (redis-slave.conf) 14bind 0.0.0.015port 638016daemonize yes17pidfile /var/run/redis_6380.pid18logfile /var/log/redis_6380.log19dir /var/lib/redis2021# 主从关系配置22replicaof 192.168.1.100 6379 # 指定主节点23masterauth master_password # 主节点密码24replica-read-only yes # 从节点只读25replica-serve-stale-data yes # 连接断开时继续服务2627# 动态配置主从关系28REPLICAOF 192.168.1.100 6379 # 设置为从节点29REPLICAOF NO ONE # 取消从节点身份,升级为主节点1# 1️⃣ 建立连接阶段2# 从节点 -> 主节点: PING3# 主节点 -> 从节点: PONG4# 从节点 -> 主节点: AUTH <password>5# 主节点 -> 从节点: OK67# 2️⃣ 数据同步阶段8# 从节点 -> 主节点: PSYNC <runid> <offset>9# 10# 情况A:全量同步11# 主节点 -> 从节点: FULLRESYNC <runid> <offset>12# 主节点执行BGSAVE生成RDB文件13# 主节点发送RDB文件给从节点14# 从节点清空数据库并加载RDB文件15# 主节点发送同步期间的写命令缓冲区16#17# 情况B:部分同步 18# 主节点 -> 从节点: CONTINUE19# 主节点发送复制积压缓冲区中的数据2021# 3️⃣ 命令传播阶段22# 主节点接收写命令后:23# 1. 执行命令24# 2. 发送命令给所有从节点25# 3. 从节点执行命令保持数据一致2627# 复制相关命令28INFO replication # 查看复制信息29ROLE # 查看节点角色30REPLICAOF NO ONE # 停止复制1# 复制性能优化2repl-diskless-sync no # 是否启用无盘复制3repl-diskless-sync-delay 5 # 无盘复制延迟时间4repl-ping-replica-period 10 # 从节点ping主节点间隔5repl-timeout 60 # 复制超时时间67# 复制积压缓冲区配置8repl-backlog-size 1mb # 积压缓冲区大小9repl-backlog-ttl 3600 # 缓冲区保留时间1011# 从节点配置优化12replica-priority 100 # 从节点优先级(哨兵选主时使用)13replica-announce-ip 192.168.1.101 # 从节点公告IP14replica-announce-port 6380 # 从节点公告端口1516# 最小从节点配置(防止数据丢失)17min-replicas-to-write 1 # 至少1个从节点才允许写入18min-replicas-max-lag 10 # 从节点最大延迟10秒1920# 监控复制状态21# 主节点监控22redis-cli -p 6379 INFO replication23# connected_slaves:224# slave0:ip=192.168.1.101,port=6380,state=online,offset=1234,lag=025# slave1:ip=192.168.1.102,port=6380,state=online,offset=1234,lag=12627# 从节点监控 28redis-cli -p 6380 INFO replication29# role:slave30# master_host:192.168.1.10031# master_port:637932# master_link_status:up33# master_last_io_seconds_ago:04.2 哨兵模式 - 自动故障转移
Redis Sentinel是Redis的高可用解决方案,提供监控、通知、自动故障转移和配置提供者功能。
- 哨兵配置
- 故障转移流程
- 客户端连接
- 监控运维
1# sentinel.conf 配置文件2port 26379 # 哨兵端口3daemonize yes # 后台运行4pidfile /var/run/redis-sentinel.pid # PID文件5logfile /var/log/redis-sentinel.log # 日志文件6dir /var/lib/redis # 工作目录78# 监控主节点配置9sentinel monitor mymaster 192.168.1.100 6379 210# mymaster: 主节点名称11# 192.168.1.100 6379: 主节点地址和端口 12# 2: 判断主节点下线需要的哨兵数量(quorum)1314# 认证配置15sentinel auth-pass mymaster master_password1617# 故障转移配置18sentinel down-after-milliseconds mymaster 5000 # 5秒无响应判定下线19sentinel failover-timeout mymaster 15000 # 故障转移超时时间20sentinel parallel-syncs mymaster 1 # 同时同步的从节点数量2122# 通知脚本配置23sentinel notification-script mymaster /opt/scripts/notify.sh24sentinel client-reconfig-script mymaster /opt/scripts/reconfig.sh2526# 启动哨兵27redis-sentinel /etc/redis/sentinel.conf28# 或者29redis-server /etc/redis/sentinel.conf --sentinel1# 1️⃣ 主观下线(Subjectively Down)2# 单个哨兵检测到主节点无响应超过down-after-milliseconds3# 标记主节点为主观下线状态(SDOWN)45# 2️⃣ 客观下线(Objectively Down) 6# 哨兵询问其他哨兵对主节点的状态判断7# 当超过quorum数量的哨兵认为主节点下线时8# 标记主节点为客观下线状态(ODOWN)910# 3️⃣ 选举领导者哨兵11# 哨兵之间进行Raft算法选举12# 选出负责故障转移的领导者哨兵13# 需要获得超过半数哨兵的投票1415# 4️⃣ 选择新的主节点16# 领导者哨兵从从节点中选择新主节点17# 选择规则(按优先级):18# a) replica-priority最小的从节点19# b) 复制偏移量最大的从节点(数据最新)20# c) run_id最小的从节点2122# 5️⃣ 执行故障转移23# 向选中的从节点发送REPLICAOF NO ONE命令24# 向其他从节点发送REPLICAOF <new_master_ip> <new_master_port>25# 更新哨兵配置文件中的主节点信息26# 通过发布订阅通知客户端主节点变更2728# 故障转移相关命令29SENTINEL masters # 查看所有被监控的主节点30SENTINEL slaves mymaster # 查看指定主节点的从节点31SENTINEL sentinels mymaster # 查看监控指定主节点的哨兵32SENTINEL get-master-addr-by-name mymaster # 获取主节点地址33SENTINEL failover mymaster # 手动触发故障转移34SENTINEL reset mymaster # 重置指定主节点的状态1@Configuration2public class RedisConfig {3 4 @Bean5 public LettuceConnectionFactory redisConnectionFactory() {6 // 哨兵配置7 RedisSentinelConfiguration sentinelConfig = 8 new RedisSentinelConfiguration()9 .master("mymaster") // 主节点名称10 .sentinel("192.168.1.100", 26379) // 哨兵111 .sentinel("192.168.1.101", 26379) // 哨兵2 12 .sentinel("192.168.1.102", 26379); // 哨兵313 14 // 设置密码15 sentinelConfig.setPassword("master_password");16 17 // 连接池配置18 GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();19 poolConfig.setMaxTotal(50);20 poolConfig.setMaxIdle(10);21 poolConfig.setMinIdle(5);22 poolConfig.setMaxWaitMillis(3000);23 24 LettucePoolingClientConfiguration clientConfig = 25 LettucePoolingClientConfiguration.builder()26 .poolConfig(poolConfig)27 .build();28 29 return new LettuceConnectionFactory(sentinelConfig, clientConfig);30 }31 32 @Bean33 public RedisTemplate<String, Object> redisTemplate(34 LettuceConnectionFactory connectionFactory) {35 RedisTemplate<String, Object> template = new RedisTemplate<>();36 template.setConnectionFactory(connectionFactory);37 38 // 序列化配置39 template.setKeySerializer(new StringRedisSerializer());40 template.setValueSerializer(new GenericJackson2JsonRedisSerializer());41 template.setHashKeySerializer(new StringRedisSerializer());42 template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());43 44 return template;45 }46}4748// 客户端故障转移处理49@Component50public class RedisFailoverHandler implements MessageListener {51 52 private static final Logger logger = LoggerFactory.getLogger(RedisFailoverHandler.class);53 54 @Override55 public void onMessage(Message message, byte[] pattern) {56 String channel = new String(message.getChannel());57 String msg = new String(message.getBody());58 59 if ("+switch-master".equals(channel)) {60 logger.info("主节点切换通知: {}", msg);61 // 处理主节点切换逻辑62 handleMasterSwitch(msg);63 }64 }65 66 private void handleMasterSwitch(String message) {67 // 解析切换信息:mymaster 192.168.1.100 6379 192.168.1.101 638068 String[] parts = message.split(" ");69 String masterName = parts[0];70 String oldMasterHost = parts[1];71 int oldMasterPort = Integer.parseInt(parts[2]);72 String newMasterHost = parts[3];73 int newMasterPort = Integer.parseInt(parts[4]);74 75 logger.info("主节点从 {}:{} 切换到 {}:{}", 76 oldMasterHost, oldMasterPort, newMasterHost, newMasterPort);77 78 // 更新应用程序配置或重新初始化连接池79 // 大多数Redis客户端会自动处理这种切换80 }81}1# 哨兵状态监控2redis-cli -p 26379 SENTINEL masters3# name=mymaster,status=ok,address=192.168.1.100:6379,slaves=2,sentinels=345redis-cli -p 26379 SENTINEL slaves mymaster6# 显示所有从节点信息78redis-cli -p 26379 SENTINEL sentinels mymaster 9# 显示所有哨兵信息1011# 哨兵日志分析12tail -f /var/log/redis-sentinel.log13# +monitor master mymaster 192.168.1.100 6379 quorum 214# +slave slave 192.168.1.101:6380 192.168.1.101 6380 @ mymaster 192.168.1.100 637915# +sdown master mymaster 192.168.1.100 637916# +odown master mymaster 192.168.1.100 6379 #quorum 2/217# +new-epoch 118# +try-failover master mymaster 192.168.1.100 637919# +vote-for-leader 192.168.1.100:26379 120# +elected-leader master mymaster 192.168.1.100 637921# +failover-state-select-slave master mymaster 192.168.1.100 637922# +selected-slave slave 192.168.1.101:6380 192.168.1.101 6380 @ mymaster 192.168.1.100 637923# +failover-state-send-slaveof-noone slave 192.168.1.101:6380 192.168.1.101 6380 @ mymaster 192.168.1.100 637924# +failover-state-wait-promotion slave 192.168.1.101:6380 192.168.1.101 6380 @ mymaster 192.168.1.100 637925# +promoted-slave slave 192.168.1.101:6380 192.168.1.101 6380 @ mymaster 192.168.1.100 637926# +failover-state-reconf-slaves master mymaster 192.168.1.100 637927# +slave-reconf-sent slave 192.168.1.102:6380 192.168.1.102 6380 @ mymaster 192.168.1.100 637928# +slave-reconf-inprog slave 192.168.1.102:6380 192.168.1.102 6380 @ mymaster 192.168.1.100 637929# +slave-reconf-done slave 192.168.1.102:6380 192.168.1.102 6380 @ mymaster 192.168.1.100 637930# +failover-end master mymaster 192.168.1.100 637931# +switch-master mymaster 192.168.1.100 6379 192.168.1.101 63803233# 性能监控指标34# 1. 哨兵响应时间35# 2. 故障转移时间36# 3. 主从延迟37# 4. 哨兵之间的网络延迟38# 5. 误判率(频繁故障转移)3940# 运维最佳实践41# 1. 部署奇数个哨兵节点(至少3个)42# 2. 哨兵部署在不同的物理机器上43# 3. 合理设置down-after-milliseconds(避免网络抖动误判)44# 4. 监控哨兵日志,及时发现问题45# 5. 定期测试故障转移流程46# 6. 客户端要支持哨兵模式的自动切换4.3 集群模式 - 水平扩展方案
Redis Cluster是Redis的分布式解决方案,支持数据分片、自动故障转移和水平扩展。
- 集群搭建
- 槽位分配
- 集群操作
- 客户端代码
1# 1. 节点配置文件 (redis-7000.conf)2port 70003cluster-enabled yes # 启用集群模式4cluster-config-file nodes-7000.conf # 集群配置文件5cluster-node-timeout 5000 # 节点超时时间6cluster-announce-ip 192.168.1.100 # 节点公告IP7cluster-announce-port 7000 # 节点公告端口8cluster-announce-bus-port 17000 # 集群总线端口910# 持久化配置11appendonly yes12appendfilename "appendonly-7000.aof"1314# 其他配置15daemonize yes16pidfile /var/run/redis_7000.pid17logfile /var/log/redis_7000.log18dir /var/lib/redis/70001920# 2. 启动所有节点21redis-server /etc/redis/redis-7000.conf22redis-server /etc/redis/redis-7001.conf 23redis-server /etc/redis/redis-7002.conf24redis-server /etc/redis/redis-7003.conf25redis-server /etc/redis/redis-7004.conf26redis-server /etc/redis/redis-7005.conf2728# 3. 创建集群29redis-cli --cluster create \30 192.168.1.100:7000 192.168.1.100:7001 192.168.1.100:7002 \31 192.168.1.100:7003 192.168.1.100:7004 192.168.1.100:7005 \32 --cluster-replicas 13334# 4. 验证集群状态35redis-cli -c -p 7000 cluster nodes36redis-cli -c -p 7000 cluster info1# Redis集群槽位概念2# - 集群共有16384个槽位(0-16383)3# - 每个主节点负责一部分槽位4# - 通过CRC16算法计算key所属槽位:HASH_SLOT = CRC16(key) mod 1638456# 槽位分配示例7# 节点1:0-5460 (5461个槽位)8# 节点2:5461-10922 (5462个槽位) 9# 节点3:10923-16383(5461个槽位)1011# 查看槽位分配12redis-cli -p 7000 cluster slots13# 0-5460 192.168.1.100:7000 192.168.1.100:700314# 5461-10922 192.168.1.100:7001 192.168.1.100:700415# 10923-16383 192.168.1.100:7002 192.168.1.100:70051617# 计算key的槽位18redis-cli -p 7000 cluster keyslot "user:1001"19# (integer) 91892021# 查看槽位中的key数量22redis-cli -p 7000 cluster countkeysinslot 918923# (integer) 12425# 获取槽位中的key26redis-cli -p 7000 cluster getkeysinslot 9189 1027# 1) "user:1001"2829# Hash Tag机制 - 确保相关key在同一槽位30SET user:{1001}:profile "John Doe"31SET user:{1001}:settings "theme:dark"32SET user:{1001}:preferences "lang:en"33# 这些key都会被分配到同一个槽位,因为都包含{1001}3435# 槽位迁移(在线扩容/缩容时使用)36redis-cli --cluster reshard 192.168.1.100:700037# 交互式迁移槽位到新节点1# 集群信息查看2redis-cli -c -p 7000 cluster info3# cluster_state:ok4# cluster_slots_assigned:163845# cluster_slots_ok:163846# cluster_slots_pfail:07# cluster_slots_fail:08# cluster_known_nodes:69# cluster_size:31011redis-cli -c -p 7000 cluster nodes12# 显示所有节点信息,包括ID、角色、状态、槽位等1314# 数据操作(客户端重定向)15redis-cli -c -p 700016127.0.0.1:7000> SET user:1001 "John"17-> Redirected to slot [9189] located at 192.168.1.100:700118OK19192.168.1.100:7001> GET user:100120"John"2122# 批量操作限制23# 不支持跨槽位的多key操作24MSET user:1001 "John" user:1002 "Jane" # 可能失败25# (error) CROSSSLOT Keys in request don't hash to the same slot2627# 使用Hash Tag解决28MSET user:{group1}:1001 "John" user:{group1}:1002 "Jane" # 成功2930# 节点管理31redis-cli --cluster add-node 192.168.1.100:7006 192.168.1.100:700032# 添加新节点到集群3334redis-cli --cluster del-node 192.168.1.100:7000 <node-id>35# 从集群中删除节点3637redis-cli --cluster rebalance 192.168.1.100:700038# 重新平衡槽位分配3940# 故障转移41redis-cli -p 7003 cluster failover42# 手动触发故障转移,将从节点提升为主节点4344redis-cli -p 7003 cluster failover force45# 强制故障转移(即使主节点正常)1@Configuration2public class RedisClusterConfig {3 4 @Bean5 public LettuceConnectionFactory redisConnectionFactory() {6 // 集群节点配置7 List<RedisNode> nodes = Arrays.asList(8 new RedisNode("192.168.1.100", 7000),9 new RedisNode("192.168.1.100", 7001),10 new RedisNode("192.168.1.100", 7002),11 new RedisNode("192.168.1.100", 7003),12 new RedisNode("192.168.1.100", 7004),13 new RedisNode("192.168.1.100", 7005)14 );15 16 RedisClusterConfiguration clusterConfig = 17 new RedisClusterConfiguration();18 clusterConfig.setClusterNodes(nodes);19 clusterConfig.setMaxRedirects(3); // 最大重定向次数20 21 // 连接池配置22 GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();23 poolConfig.setMaxTotal(100);24 poolConfig.setMaxIdle(20);25 poolConfig.setMinIdle(10);26 poolConfig.setMaxWaitMillis(3000);27 28 LettucePoolingClientConfiguration clientConfig = 29 LettucePoolingClientConfiguration.builder()30 .poolConfig(poolConfig)31 .build();32 33 return new LettuceConnectionFactory(clusterConfig, clientConfig);34 }35 36 @Bean37 public RedisTemplate<String, Object> redisTemplate(38 LettuceConnectionFactory connectionFactory) {39 RedisTemplate<String, Object> template = new RedisTemplate<>();40 template.setConnectionFactory(connectionFactory);41 42 // 序列化配置43 template.setKeySerializer(new StringRedisSerializer());44 template.setValueSerializer(new GenericJackson2JsonRedisSerializer());45 template.setHashKeySerializer(new StringRedisSerializer());46 template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());47 48 return template;49 }50}5152// 集群操作服务53@Service54public class RedisClusterService {55 56 @Autowired57 private RedisTemplate<String, Object> redisTemplate;58 59 /**60 * 批量操作 - 使用Hash Tag确保key在同一槽位61 */62 public void batchOperationWithHashTag(String groupId, Map<String, Object> data) {63 Map<String, Object> hashTaggedData = new HashMap<>();64 65 for (Map.Entry<String, Object> entry : data.entrySet()) {66 String key = String.format("data:{%s}:%s", groupId, entry.getKey());67 hashTaggedData.put(key, entry.getValue());68 }69 70 redisTemplate.opsForValue().multiSet(hashTaggedData);71 }72 73 /**74 * 分布式计数器 - 利用集群的分片特性75 */76 public Long distributedIncrement(String counterName, String shardKey) {77 String key = String.format("counter:%s:{%s}", counterName, shardKey);78 return redisTemplate.opsForValue().increment(key);79 }80 81 /**82 * 集群状态监控83 */84 public Map<String, Object> getClusterInfo() {85 return redisTemplate.execute((RedisCallback<Map<String, Object>>) connection -> {86 if (connection instanceof JedisClusterConnection) {87 JedisClusterConnection clusterConnection = (JedisClusterConnection) connection;88 // 获取集群信息的具体实现89 Map<String, Object> info = new HashMap<>();90 info.put("cluster_state", "ok");91 info.put("cluster_size", 3);92 return info;93 }94 return Collections.emptyMap();95 });96 }97}4.4 高可用方案对比与选择
| 方案 | 数据一致性 | 可用性 | 扩展性 | 复杂度 | 适用场景 |
|---|---|---|---|---|---|
| 主从复制 | 最终一致 | 中等 | 读扩展 | 低 | 读多写少,简单架构 |
| 哨兵模式 | 最终一致 | 高 | 读扩展 | 中等 | 高可用要求,自动故障转移 |
| 集群模式 | 最终一致 | 高 | 读写扩展 | 高 | 大数据量,高并发,水平扩展 |
5. Redis性能优化实战
Redis性能优化是一个系统工程,涉及内存、网络、持久化、数据结构等多个方面。
5.1 内存优化策略
内存是Redis最宝贵的资源,合理的内存使用策略直接影响系统性能。
- 内存配置
- 数据结构优化
- 键设计优化
- 内存监控
1# 基础内存配置2maxmemory 8gb # 设置最大内存限制3maxmemory-policy allkeys-lru # 内存淘汰策略45# 内存淘汰策略详解6# noeviction:不淘汰,内存满时写入报错(默认)7# allkeys-lru:从所有key中使用LRU算法淘汰8# volatile-lru:从设置了过期时间的key中使用LRU算法淘汰9# allkeys-random:从所有key中随机淘汰10# volatile-random:从设置了过期时间的key中随机淘汰11# volatile-ttl:淘汰即将过期的key12# allkeys-lfu:从所有key中使用LFU算法淘汰(Redis 4.0+)13# volatile-lfu:从设置了过期时间的key中使用LFU算法淘汰(Redis 4.0+)1415# LRU配置优化16maxmemory-samples 5 # LRU采样数量,越大越精确但性能越低1718# 内存使用监控19redis-cli INFO memory20# used_memory:8589934592 # 已使用内存(字节)21# used_memory_human:8.00G # 已使用内存(人类可读)22# used_memory_rss:9663676416 # 系统分配内存23# used_memory_peak:8589934592 # 内存使用峰值24# used_memory_peak_human:8.00G # 内存使用峰值(人类可读)25# maxmemory:8589934592 # 最大内存限制26# maxmemory_human:8.00G # 最大内存限制(人类可读)27# maxmemory_policy:allkeys-lru # 内存淘汰策略1# 1. String vs Hash 对比2# 不推荐:使用String存储用户信息3SET user:1001:name "John" # 占用内存:~50字节4SET user:1001:age "25" # 占用内存:~45字节 5SET user:1001:email "john@example.com" # 占用内存:~65字节6# 总计:~160字节78# 推荐:使用Hash存储用户信息9HMSET user:1001 name "John" age "25" email "john@example.com"10# 总计:~80字节,节省50%内存1112# 2. 小对象压缩配置13# Hash压缩配置14hash-max-ziplist-entries 512 # 字段数量小于512时使用ziplist15hash-max-ziplist-value 64 # 字段值小于64字节时使用ziplist1617# List压缩配置 18list-max-ziplist-size -2 # 每个节点最大8KB19list-compress-depth 0 # 不压缩头尾节点2021# Set压缩配置22set-max-intset-entries 512 # 整数集合最大512个元素2324# Sorted Set压缩配置25zset-max-ziplist-entries 128 # 元素数量小于128时使用ziplist26zset-max-ziplist-value 64 # 元素值小于64字节时使用ziplist2728# 3. 内存碎片优化29# 启用内存碎片整理(Redis 4.0+)30activedefrag yes # 启用主动碎片整理31active-defrag-ignore-bytes 100mb # 碎片小于100MB时不整理32active-defrag-threshold-lower 10 # 碎片率低于10%时不整理33active-defrag-threshold-upper 100 # 碎片率高于100%时积极整理34active-defrag-cycle-min 5 # 最小CPU使用率5%35active-defrag-cycle-max 75 # 最大CPU使用率75%1# 1. 键命名规范2# 推荐格式:业务:对象:ID:属性3user:profile:1001:basic # 用户基础信息4user:profile:1001:settings # 用户设置信息5order:detail:2001:items # 订单商品信息6cache:product:3001:info # 商品缓存信息78# 2. 避免过长的键名9# 不推荐10SET "this_is_a_very_long_key_name_that_wastes_memory_and_affects_performance" "value"1112# 推荐 13SET "user:1001:profile" "value"1415# 3. 使用合适的过期时间16SET cache:user:1001 "data" EX 3600 # 1小时过期17SET session:abc123 "data" EX 1800 # 30分钟过期18SET temp:data:xyz "data" EX 300 # 5分钟过期1920# 4. 批量操作优化21# 不推荐:多次单独操作22SET user:1001:name "John"23SET user:1001:age "25" 24SET user:1001:email "john@example.com"2526# 推荐:批量操作27MSET user:1001:name "John" user:1001:age "25" user:1001:email "john@example.com"2829# 或者使用Hash30HMSET user:1001 name "John" age "25" email "john@example.com"3132# 5. 大key拆分33# 不推荐:单个大Hash34HMSET big_hash field1 value1 field2 value2 ... field10000 value100003536# 推荐:拆分为多个小Hash37HMSET hash:1 field1 value1 field2 value2 ... field100 value10038HMSET hash:2 field101 value101 field102 value102 ... field200 value2001@Component2public class RedisMemoryMonitor {3 4 @Autowired5 private RedisTemplate<String, Object> redisTemplate;6 7 private static final Logger logger = LoggerFactory.getLogger(RedisMemoryMonitor.class);8 9 /**10 * 获取内存使用信息11 */12 public Map<String, Object> getMemoryInfo() {13 return redisTemplate.execute((RedisCallback<Map<String, Object>>) connection -> {14 Properties info = connection.info("memory");15 Map<String, Object> memoryInfo = new HashMap<>();16 17 memoryInfo.put("used_memory", info.getProperty("used_memory"));18 memoryInfo.put("used_memory_human", info.getProperty("used_memory_human"));19 memoryInfo.put("used_memory_rss", info.getProperty("used_memory_rss"));20 memoryInfo.put("used_memory_peak", info.getProperty("used_memory_peak"));21 memoryInfo.put("maxmemory", info.getProperty("maxmemory"));22 memoryInfo.put("maxmemory_policy", info.getProperty("maxmemory_policy"));23 24 return memoryInfo;25 });26 }27 28 /**29 * 计算内存使用率30 */31 public double getMemoryUsageRatio() {32 Map<String, Object> memoryInfo = getMemoryInfo();33 long usedMemory = Long.parseLong((String) memoryInfo.get("used_memory"));34 long maxMemory = Long.parseLong((String) memoryInfo.get("maxmemory"));35 36 if (maxMemory == 0) {37 return 0.0;38 }39 40 return (double) usedMemory / maxMemory * 100;41 }42 43 /**44 * 内存告警检查45 */46 @Scheduled(fixedRate = 60000) // 每分钟检查一次47 public void checkMemoryUsage() {48 double usageRatio = getMemoryUsageRatio();49 50 if (usageRatio > 80) {51 logger.warn("Redis内存使用率过高: {}%", String.format("%.2f", usageRatio));52 // 发送告警通知53 sendAlert("Redis内存使用率告警", "当前使用率: " + String.format("%.2f", usageRatio) + "%");54 }55 }56 57 /**58 * 分析大key59 */60 public List<String> findBigKeys() {61 return redisTemplate.execute((RedisCallback<List<String>>) connection -> {62 List<String> bigKeys = new ArrayList<>();63 64 // 使用SCAN命令遍历所有key65 ScanOptions options = ScanOptions.scanOptions().count(100).build();66 Cursor<byte[]> cursor = connection.scan(options);67 68 while (cursor.hasNext()) {69 byte[] keyBytes = cursor.next();70 String key = new String(keyBytes);71 72 // 检查key的内存使用73 Long memoryUsage = connection.memoryUsage(keyBytes);74 if (memoryUsage != null && memoryUsage > 1024 * 1024) { // 大于1MB75 bigKeys.add(key + " (" + formatBytes(memoryUsage) + ")");76 }77 }78 79 return bigKeys;80 });81 }82 83 private String formatBytes(long bytes) {84 if (bytes < 1024) return bytes + " B";85 if (bytes < 1024 * 1024) return String.format("%.2f KB", bytes / 1024.0);86 if (bytes < 1024 * 1024 * 1024) return String.format("%.2f MB", bytes / (1024.0 * 1024));87 return String.format("%.2f GB", bytes / (1024.0 * 1024 * 1024));88 }89 90 private void sendAlert(String title, String message) {91 // 实现告警通知逻辑(邮件、短信、钉钉等)92 logger.error("告警: {} - {}", title, message);93 }94}5.2 网络与连接优化
网络和连接配置直接影响Redis的响应时间和并发处理能力。
- 网络配置
- 连接池优化
- Pipeline优化
1# 基础网络配置2bind 0.0.0.0 # 绑定所有网络接口3port 6379 # 监听端口4tcp-backlog 511 # TCP监听队列长度5timeout 0 # 客户端空闲超时时间(0表示不超时)67# TCP配置优化8tcp-keepalive 300 # TCP keepalive时间(秒)9# 建议设置为300秒,可以及时发现断开的连接1011# 客户端连接配置12maxclients 10000 # 最大客户端连接数13# 默认10000,根据服务器性能和内存调整1415# 输出缓冲区配置16client-output-buffer-limit normal 0 0 0 # 普通客户端无限制17client-output-buffer-limit replica 256mb 64mb 60 # 从节点客户端18client-output-buffer-limit pubsub 32mb 8mb 60 # 发布订阅客户端1920# 网络性能监控21redis-cli INFO clients22# connected_clients:100 # 当前连接的客户端数量23# client_recent_max_input_buffer:2 # 最近客户端最大输入缓冲区24# client_recent_max_output_buffer:0 # 最近客户端最大输出缓冲区25# blocked_clients:0 # 被阻塞的客户端数量2627redis-cli INFO stats28# total_connections_received:1000 # 总连接数29# total_commands_processed:50000 # 总命令数30# instantaneous_ops_per_sec:100 # 当前每秒操作数31# instantaneous_input_kbps:10.5 # 当前输入速率(KB/s)32# instantaneous_output_kbps:15.2 # 当前输出速率(KB/s)1@Configuration2public class RedisConnectionPoolConfig {3 4 @Bean5 public LettuceConnectionFactory redisConnectionFactory() {6 // 单机配置7 RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();8 config.setHostName("192.168.1.100");9 config.setPort(6379);10 config.setPassword("your_password");11 config.setDatabase(0);12 13 // 连接池配置14 GenericObjectPoolConfig<Object> poolConfig = new GenericObjectPoolConfig<>();15 16 // 连接池大小配置17 poolConfig.setMaxTotal(200); // 最大连接数18 poolConfig.setMaxIdle(50); // 最大空闲连接数19 poolConfig.setMinIdle(10); // 最小空闲连接数20 21 // 连接获取配置22 poolConfig.setMaxWaitMillis(3000); // 最大等待时间(ms)23 poolConfig.setBlockWhenExhausted(true); // 连接耗尽时是否阻塞24 25 // 连接验证配置26 poolConfig.setTestOnBorrow(true); // 获取连接时验证27 poolConfig.setTestOnReturn(false); // 归还连接时验证28 poolConfig.setTestWhileIdle(true); // 空闲时验证连接29 poolConfig.setTimeBetweenEvictionRunsMillis(30000); // 空闲检测间隔(ms)30 poolConfig.setMinEvictableIdleTimeMillis(60000); // 最小空闲时间(ms)31 poolConfig.setNumTestsPerEvictionRun(3); // 每次检测连接数32 33 // Lettuce客户端配置34 LettucePoolingClientConfiguration clientConfig = 35 LettucePoolingClientConfiguration.builder()36 .poolConfig(poolConfig)37 .commandTimeout(Duration.ofSeconds(5)) // 命令超时时间38 .shutdownTimeout(Duration.ofSeconds(10)) // 关闭超时时间39 .build();40 41 return new LettuceConnectionFactory(config, clientConfig);42 }43 44 @Bean45 public RedisTemplate<String, Object> redisTemplate(46 LettuceConnectionFactory connectionFactory) {47 RedisTemplate<String, Object> template = new RedisTemplate<>();48 template.setConnectionFactory(connectionFactory);49 50 // 序列化配置51 Jackson2JsonRedisSerializer<Object> serializer = 52 new Jackson2JsonRedisSerializer<>(Object.class);53 ObjectMapper objectMapper = new ObjectMapper();54 objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);55 objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, 56 ObjectMapper.DefaultTyping.NON_FINAL);57 serializer.setObjectMapper(objectMapper);58 59 template.setKeySerializer(new StringRedisSerializer());60 template.setValueSerializer(serializer);61 template.setHashKeySerializer(new StringRedisSerializer());62 template.setHashValueSerializer(serializer);63 64 template.afterPropertiesSet();65 return template;66 }67}6869// 连接池监控70@Component71public class RedisConnectionPoolMonitor {72 73 @Autowired74 private LettuceConnectionFactory connectionFactory;75 76 private static final Logger logger = LoggerFactory.getLogger(RedisConnectionPoolMonitor.class);77 78 @Scheduled(fixedRate = 60000) // 每分钟监控一次79 public void monitorConnectionPool() {80 try {81 // 获取连接池统计信息82 GenericObjectPool<Object> pool = 83 (GenericObjectPool<Object>) connectionFactory.getClientConfiguration();84 85 if (pool != null) {86 int activeConnections = pool.getNumActive();87 int idleConnections = pool.getNumIdle();88 int totalConnections = activeConnections + idleConnections;89 90 logger.info("Redis连接池状态 - 活跃连接: {}, 空闲连接: {}, 总连接: {}", 91 activeConnections, idleConnections, totalConnections);92 93 // 连接池使用率告警94 double usageRatio = (double) activeConnections / pool.getMaxTotal() * 100;95 if (usageRatio > 80) {96 logger.warn("Redis连接池使用率过高: {}%", String.format("%.2f", usageRatio));97 }98 }99 } catch (Exception e) {100 logger.error("监控Redis连接池失败", e);101 }102 }103}1@Service2public class RedisPipelineService {3 4 @Autowired5 private RedisTemplate<String, Object> redisTemplate;6 7 /**8 * 使用Pipeline批量设置数据9 */10 public void batchSetWithPipeline(Map<String, Object> data) {11 redisTemplate.executePipelined((RedisCallback<Object>) connection -> {12 for (Map.Entry<String, Object> entry : data.entrySet()) {13 connection.set(14 entry.getKey().getBytes(), 15 serialize(entry.getValue())16 );17 }18 return null;19 });20 }21 22 /**23 * 使用Pipeline批量获取数据24 */25 public List<Object> batchGetWithPipeline(List<String> keys) {26 return redisTemplate.executePipelined((RedisCallback<Object>) connection -> {27 for (String key : keys) {28 connection.get(key.getBytes());29 }30 return null;31 });32 }33 34 /**35 * 性能对比测试36 */37 public void performanceComparison() {38 Map<String, Object> testData = new HashMap<>();39 for (int i = 0; i < 1000; i++) {40 testData.put("test:key:" + i, "value" + i);41 }42 43 // 普通方式44 long start1 = System.currentTimeMillis();45 for (Map.Entry<String, Object> entry : testData.entrySet()) {46 redisTemplate.opsForValue().set(entry.getKey(), entry.getValue());47 }48 long time1 = System.currentTimeMillis() - start1;49 50 // Pipeline方式51 long start2 = System.currentTimeMillis();52 batchSetWithPipeline(testData);53 long time2 = System.currentTimeMillis() - start2;54 55 System.out.println("普通方式耗时: " + time1 + "ms");56 System.out.println("Pipeline方式耗时: " + time2 + "ms");57 System.out.println("性能提升: " + (time1 / (double) time2) + "倍");58 }59 60 /**61 * 智能批量操作 - 自动分批处理62 */63 public void smartBatchOperation(Map<String, Object> data, int batchSize) {64 List<Map.Entry<String, Object>> entries = new ArrayList<>(data.entrySet());65 66 for (int i = 0; i < entries.size(); i += batchSize) {67 int endIndex = Math.min(i + batchSize, entries.size());68 List<Map.Entry<String, Object>> batch = entries.subList(i, endIndex);69 70 redisTemplate.executePipelined((RedisCallback<Object>) connection -> {71 for (Map.Entry<String, Object> entry : batch) {72 connection.set(73 entry.getKey().getBytes(), 74 serialize(entry.getValue())75 );76 }77 return null;78 });79 }80 }81 82 private byte[] serialize(Object obj) {83 // 实现序列化逻辑84 return obj.toString().getBytes();85 }86}8788// Lua脚本优化89@Service90public class RedisLuaScriptService {91 92 @Autowired93 private RedisTemplate<String, Object> redisTemplate;94 95 // 原子性增加库存的Lua脚本96 private static final String STOCK_SCRIPT = 97 "local key = KEYS[1] " +98 "local delta = tonumber(ARGV[1]) " +99 "local current = redis.call('GET', key) " +100 "if current == false then " +101 " current = 0 " +102 "else " +103 " current = tonumber(current) " +104 "end " +105 "local newValue = current + delta " +106 "if newValue >= 0 then " +107 " redis.call('SET', key, newValue) " +108 " return newValue " +109 "else " +110 " return -1 " +111 "end";112 113 /**114 * 原子性库存操作115 */116 public Long atomicStockOperation(String productId, int delta) {117 DefaultRedisScript<Long> script = new DefaultRedisScript<>();118 script.setScriptText(STOCK_SCRIPT);119 script.setResultType(Long.class);120 121 return redisTemplate.execute(script, 122 Collections.singletonList("stock:" + productId), 123 String.valueOf(delta));124 }125 126 // 分布式限流Lua脚本127 private static final String RATE_LIMIT_SCRIPT =128 "local key = KEYS[1] " +129 "local window = tonumber(ARGV[1]) " +130 "local limit = tonumber(ARGV[2]) " +131 "local current = redis.call('INCR', key) " +132 "if current == 1 then " +133 " redis.call('EXPIRE', key, window) " +134 "end " +135 "if current <= limit then " +136 " return 1 " +137 "else " +138 " return 0 " +139 "end";140 141 /**142 * 分布式限流143 */144 public boolean rateLimitCheck(String key, int windowSeconds, int limit) {145 DefaultRedisScript<Long> script = new DefaultRedisScript<>();146 script.setScriptText(RATE_LIMIT_SCRIPT);147 script.setResultType(Long.class);148 149 Long result = redisTemplate.execute(script, 150 Collections.singletonList(key), 151 String.valueOf(windowSeconds), 152 String.valueOf(limit));153 154 return Long.valueOf(1).equals(result);155 }156}5.3 持久化性能优化
持久化配置直接影响Redis的写入性能和数据安全性,需要根据业务需求进行平衡。
- RDB优化
- AOF优化
- 持久化监控
1# RDB触发条件优化2# 根据业务特点调整save配置3save 900 1 # 15分钟内至少1个key变化4save 300 10 # 5分钟内至少10个key变化5save 60 10000 # 1分钟内至少10000个key变化67# 高写入场景的配置8save 3600 1 # 1小时内至少1个key变化9save 1800 100 # 30分钟内至少100个key变化10save 300 10000 # 5分钟内至少10000个key变化1112# 低写入场景的配置13save 300 1 # 5分钟内至少1个key变化14save 60 10 # 1分钟内至少10个key变化15save 10 1000 # 10秒内至少1000个key变化1617# RDB性能配置18stop-writes-on-bgsave-error yes # BGSAVE失败时停止写入19rdbcompression yes # 启用RDB压缩(CPU换存储)20rdbchecksum yes # 启用RDB校验(安全性)21rdb-save-incremental-fsync yes # 增量fsync(减少IO阻塞)2223# 监控RDB性能24redis-cli LASTSAVE # 最后一次保存时间25redis-cli INFO persistence26# rdb_changes_since_last_save:1000 # 上次保存后的变化数27# rdb_bgsave_in_progress:0 # 是否正在进行BGSAVE28# rdb_last_save_time:1640995200 # 最后保存时间戳29# rdb_last_bgsave_status:ok # 最后BGSAVE状态30# rdb_last_bgsave_time_sec:2 # 最后BGSAVE耗时1# AOF基础配置2appendonly yes # 启用AOF3appendfilename "appendonly.aof" # AOF文件名4appendfsync everysec # 每秒同步(推荐)56# AOF同步策略对比7# appendfsync always - 每个写命令都同步,最安全但性能最低8# appendfsync everysec - 每秒同步一次,平衡安全性和性能(推荐)9# appendfsync no - 由OS决定同步时机,性能最高但可能丢失数据1011# AOF重写优化配置12auto-aof-rewrite-percentage 100 # AOF文件增长100%时重写13auto-aof-rewrite-min-size 64mb # AOF文件最小64MB时才重写14aof-rewrite-incremental-fsync yes # 重写时增量同步15aof-load-truncated yes # 加载截断的AOF文件1617# 混合持久化(推荐)18aof-use-rdb-preamble yes # AOF重写时使用RDB格式1920# 监控AOF性能21redis-cli INFO persistence22# aof_enabled:1 # AOF是否启用23# aof_rewrite_in_progress:0 # 是否正在重写24# aof_rewrite_scheduled:0 # 是否计划重写25# aof_last_rewrite_time_sec:3 # 最后重写耗时26# aof_current_rewrite_time_sec:-1 # 当前重写耗时27# aof_last_bgrewrite_status:ok # 最后重写状态28# aof_current_size:1024000 # 当前AOF文件大小29# aof_base_size:512000 # 基础AOF文件大小1@Component2public class RedisPersistenceMonitor {3 4 @Autowired5 private RedisTemplate<String, Object> redisTemplate;6 7 private static final Logger logger = LoggerFactory.getLogger(RedisPersistenceMonitor.class);8 9 /**10 * 监控持久化状态11 */12 @Scheduled(fixedRate = 300000) // 每5分钟检查一次13 public void monitorPersistence() {14 Map<String, Object> persistenceInfo = getPersistenceInfo();15 16 // 检查RDB状态17 checkRdbStatus(persistenceInfo);18 19 // 检查AOF状态20 checkAofStatus(persistenceInfo);21 22 // 检查持久化性能23 checkPersistencePerformance(persistenceInfo);24 }25 26 private Map<String, Object> getPersistenceInfo() {27 return redisTemplate.execute((RedisCallback<Map<String, Object>>) connection -> {28 Properties info = connection.info("persistence");29 Map<String, Object> result = new HashMap<>();30 31 for (String key : info.stringPropertyNames()) {32 result.put(key, info.getProperty(key));33 }34 35 return result;36 });37 }38 39 private void checkRdbStatus(Map<String, Object> info) {40 String rdbStatus = (String) info.get("rdb_last_bgsave_status");41 if (!"ok".equals(rdbStatus)) {42 logger.error("RDB持久化状态异常: {}", rdbStatus);43 sendAlert("RDB持久化告警", "RDB状态: " + rdbStatus);44 }45 46 // 检查RDB耗时47 String rdbTimeStr = (String) info.get("rdb_last_bgsave_time_sec");48 if (rdbTimeStr != null && !"-1".equals(rdbTimeStr)) {49 int rdbTime = Integer.parseInt(rdbTimeStr);50 if (rdbTime > 60) { // 超过60秒51 logger.warn("RDB持久化耗时过长: {}秒", rdbTime);52 }53 }54 }55 56 private void checkAofStatus(Map<String, Object> info) {57 String aofEnabled = (String) info.get("aof_enabled");58 if ("1".equals(aofEnabled)) {59 String aofStatus = (String) info.get("aof_last_bgrewrite_status");60 if (!"ok".equals(aofStatus)) {61 logger.error("AOF重写状态异常: {}", aofStatus);62 sendAlert("AOF持久化告警", "AOF重写状态: " + aofStatus);63 }64 65 // 检查AOF文件大小增长66 String currentSizeStr = (String) info.get("aof_current_size");67 String baseSizeStr = (String) info.get("aof_base_size");68 69 if (currentSizeStr != null && baseSizeStr != null) {70 long currentSize = Long.parseLong(currentSizeStr);71 long baseSize = Long.parseLong(baseSizeStr);72 73 if (baseSize > 0) {74 double growthRatio = (double) (currentSize - baseSize) / baseSize * 100;75 if (growthRatio > 200) { // 增长超过200%76 logger.warn("AOF文件增长过快: {}%", String.format("%.2f", growthRatio));77 }78 }79 }80 }81 }82 83 private void checkPersistencePerformance(Map<String, Object> info) {84 // 检查是否有持久化操作正在进行85 String rdbInProgress = (String) info.get("rdb_bgsave_in_progress");86 String aofInProgress = (String) info.get("aof_rewrite_in_progress");87 88 if ("1".equals(rdbInProgress)) {89 logger.info("RDB持久化正在进行中");90 }91 92 if ("1".equals(aofInProgress)) {93 logger.info("AOF重写正在进行中");94 95 String rewriteTimeStr = (String) info.get("aof_current_rewrite_time_sec");96 if (rewriteTimeStr != null && !"-1".equals(rewriteTimeStr)) {97 int rewriteTime = Integer.parseInt(rewriteTimeStr);98 if (rewriteTime > 300) { // 超过5分钟99 logger.warn("AOF重写耗时过长: {}秒", rewriteTime);100 }101 }102 }103 }104 105 /**106 * 手动触发持久化操作107 */108 public void triggerPersistence(String type) {109 redisTemplate.execute((RedisCallback<Object>) connection -> {110 switch (type.toLowerCase()) {111 case "rdb":112 connection.bgSave();113 logger.info("手动触发RDB持久化");114 break;115 case "aof":116 connection.bgReWriteAof();117 logger.info("手动触发AOF重写");118 break;119 default:120 logger.warn("未知的持久化类型: {}", type);121 }122 return null;123 });124 }125 126 /**127 * 获取持久化性能统计128 */129 public Map<String, Object> getPersistenceStats() {130 Map<String, Object> info = getPersistenceInfo();131 Map<String, Object> stats = new HashMap<>();132 133 // RDB统计134 stats.put("rdb_last_save_time", info.get("rdb_last_save_time"));135 stats.put("rdb_changes_since_last_save", info.get("rdb_changes_since_last_save"));136 stats.put("rdb_last_bgsave_time_sec", info.get("rdb_last_bgsave_time_sec"));137 138 // AOF统计139 if ("1".equals(info.get("aof_enabled"))) {140 stats.put("aof_current_size", info.get("aof_current_size"));141 stats.put("aof_base_size", info.get("aof_base_size"));142 stats.put("aof_last_rewrite_time_sec", info.get("aof_last_rewrite_time_sec"));143 }144 145 return stats;146 }147 148 private void sendAlert(String title, String message) {149 // 实现告警通知逻辑150 logger.error("告警: {} - {}", title, message);151 }152}6. Redis实战应用场景
Redis在实际项目中有着广泛的应用场景,以下是一些典型的实战案例。
6.1 缓存系统设计
缓存是Redis最常见的应用场景,合理的缓存策略可以显著提升系统性能。
- 缓存模式
- 缓存策略
1@Service2public class CacheService {3 4 @Autowired5 private RedisTemplate<String, Object> redisTemplate;6 7 @Autowired8 private UserRepository userRepository;9 10 /**11 * Cache-Aside模式(旁路缓存)12 * 应用程序直接管理缓存13 */14 public User getUserCacheAside(Long userId) {15 String cacheKey = "user:cache:" + userId;16 17 // 1. 先查缓存18 User user = (User) redisTemplate.opsForValue().get(cacheKey);19 if (user != null) {20 return user;21 }22 23 // 2. 缓存未命中,查数据库24 user = userRepository.findById(userId).orElse(null);25 if (user != null) {26 // 3. 写入缓存,设置过期时间27 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);28 }29 30 return user;31 }32 33 /**34 * Write-Through模式(写穿透)35 * 写操作同时更新缓存和数据库36 */37 public void updateUserWriteThrough(User user) {38 String cacheKey = "user:cache:" + user.getId();39 40 // 1. 更新数据库41 userRepository.save(user);42 43 // 2. 更新缓存44 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);45 }46 47 /**48 * Write-Behind模式(写回)49 * 先更新缓存,异步更新数据库50 */51 public void updateUserWriteBehind(User user) {52 String cacheKey = "user:cache:" + user.getId();53 54 // 1. 立即更新缓存55 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);56 57 // 2. 异步更新数据库58 CompletableFuture.runAsync(() -> {59 try {60 Thread.sleep(100); // 模拟延迟61 userRepository.save(user);62 } catch (Exception e) {63 // 处理异常,可能需要回滚缓存64 redisTemplate.delete(cacheKey);65 throw new RuntimeException("数据库更新失败", e);66 }67 });68 }69 70 /**71 * 缓存预热72 */73 @PostConstruct74 public void warmUpCache() {75 // 预加载热点数据76 List<User> hotUsers = userRepository.findHotUsers();77 for (User user : hotUsers) {78 String cacheKey = "user:cache:" + user.getId();79 redisTemplate.opsForValue().set(cacheKey, user, 2, TimeUnit.HOURS);80 }81 }82 83 /**84 * 缓存穿透防护 - 布隆过滤器85 */86 public User getUserWithBloomFilter(Long userId) {87 // 1. 布隆过滤器检查88 if (!bloomFilterContains("user:bloom", userId.toString())) {89 return null; // 确定不存在90 }91 92 // 2. 查缓存93 String cacheKey = "user:cache:" + userId;94 User user = (User) redisTemplate.opsForValue().get(cacheKey);95 if (user != null) {96 return user;97 }98 99 // 3. 查数据库100 user = userRepository.findById(userId).orElse(null);101 if (user != null) {102 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);103 } else {104 // 缓存空值,防止缓存穿透105 redisTemplate.opsForValue().set(cacheKey, "NULL", 5, TimeUnit.MINUTES);106 }107 108 return user;109 }110 111 /**112 * 缓存击穿防护 - 分布式锁113 */114 public User getUserWithLock(Long userId) {115 String cacheKey = "user:cache:" + userId;116 String lockKey = "user:lock:" + userId;117 118 // 1. 查缓存119 User user = (User) redisTemplate.opsForValue().get(cacheKey);120 if (user != null) {121 return user;122 }123 124 // 2. 获取分布式锁125 String lockValue = UUID.randomUUID().toString();126 Boolean lockAcquired = redisTemplate.opsForValue()127 .setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);128 129 if (Boolean.TRUE.equals(lockAcquired)) {130 try {131 // 3. 双重检查132 user = (User) redisTemplate.opsForValue().get(cacheKey);133 if (user != null) {134 return user;135 }136 137 // 4. 查数据库138 user = userRepository.findById(userId).orElse(null);139 if (user != null) {140 redisTemplate.opsForValue().set(cacheKey, user, 1, TimeUnit.HOURS);141 }142 143 return user;144 } finally {145 // 5. 释放锁146 releaseLock(lockKey, lockValue);147 }148 } else {149 // 等待其他线程加载数据150 try {151 Thread.sleep(100);152 return getUserWithLock(userId); // 递归重试153 } catch (InterruptedException e) {154 Thread.currentThread().interrupt();155 return null;156 }157 }158 }159 160 private boolean bloomFilterContains(String filterKey, String value) {161 // 简化的布隆过滤器实现162 // 实际项目中建议使用Redisson的布隆过滤器163 return true; // 假设存在164 }165 166 private void releaseLock(String lockKey, String lockValue) {167 String script = 168 "if redis.call('get', KEYS[1]) == ARGV[1] then " +169 "return redis.call('del', KEYS[1]) " +170 "else return 0 end";171 172 redisTemplate.execute(173 new DefaultRedisScript<>(script, Long.class),174 Collections.singletonList(lockKey),175 lockValue176 );177 }178}1@Service2public class MultiLevelCacheService {3 4 @Autowired5 private RedisTemplate<String, Object> redisTemplate;6 7 // 本地缓存8 private final Cache<String, Object> localCache = Caffeine.newBuilder()9 .maximumSize(1000)10 .expireAfterWrite(5, TimeUnit.MINUTES)11 .build();12 13 /**14 * 多级缓存查询15 * L1: 本地缓存 -> L2: Redis缓存 -> L3: 数据库16 */17 public Object getWithMultiLevelCache(String key, Supplier<Object> dataLoader) {18 // L1: 本地缓存19 Object value = localCache.getIfPresent(key);20 if (value != null) {21 return value;22 }23 24 // L2: Redis缓存25 value = redisTemplate.opsForValue().get(key);26 if (value != null) {27 localCache.put(key, value);28 return value;29 }30 31 // L3: 数据库32 value = dataLoader.get();33 if (value != null) {34 // 写入Redis缓存35 redisTemplate.opsForValue().set(key, value, 1, TimeUnit.HOURS);36 // 写入本地缓存37 localCache.put(key, value);38 }39 40 return value;41 }42 43 /**44 * 缓存更新策略45 */46 public void updateCache(String key, Object value) {47 // 1. 删除本地缓存48 localCache.invalidate(key);49 50 // 2. 更新Redis缓存51 redisTemplate.opsForValue().set(key, value, 1, TimeUnit.HOURS);52 53 // 3. 通知其他节点删除本地缓存54 redisTemplate.convertAndSend("cache:invalidate", key);55 }56 57 /**58 * 缓存失效监听59 */60 @EventListener61 public void handleCacheInvalidate(String key) {62 localCache.invalidate(key);63 }64}6566// 缓存注解实现67@Component68@Aspect69public class CacheAspect {70 71 @Autowired72 private RedisTemplate<String, Object> redisTemplate;73 74 @Around("@annotation(cacheable)")75 public Object cache(ProceedingJoinPoint joinPoint, Cacheable cacheable) throws Throwable {76 String key = generateKey(joinPoint, cacheable.key());77 78 // 查缓存79 Object cachedValue = redisTemplate.opsForValue().get(key);80 if (cachedValue != null) {81 return cachedValue;82 }83 84 // 执行方法85 Object result = joinPoint.proceed();86 87 // 写缓存88 if (result != null) {89 redisTemplate.opsForValue().set(key, result, 90 cacheable.expire(), TimeUnit.SECONDS);91 }92 93 return result;94 }95 96 private String generateKey(ProceedingJoinPoint joinPoint, String keyExpression) {97 // 简化的key生成逻辑98 return joinPoint.getSignature().getName() + ":" + 99 Arrays.toString(joinPoint.getArgs());100 }101}102103// 自定义缓存注解104@Target(ElementType.METHOD)105@Retention(RetentionPolicy.RUNTIME)106public @interface Cacheable {107 String key() default "";108 int expire() default 3600;109}6.2 分布式锁实现
分布式锁是分布式系统中的重要组件,Redis提供了简单高效的实现方案。
- 分布式锁
1@Component2public class RedisDistributedLock {3 4 @Autowired5 private RedisTemplate<String, String> redisTemplate;6 7 private static final String LOCK_PREFIX = "distributed:lock:";8 private static final String UNLOCK_SCRIPT = 9 "if redis.call('get', KEYS[1]) == ARGV[1] then " +10 "return redis.call('del', KEYS[1]) " +11 "else return 0 end";12 13 /**14 * 尝试获取锁15 */16 public boolean tryLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit) {17 String key = LOCK_PREFIX + lockKey;18 Boolean result = redisTemplate.opsForValue()19 .setIfAbsent(key, lockValue, expireTime, timeUnit);20 return Boolean.TRUE.equals(result);21 }22 23 /**24 * 释放锁25 */26 public boolean releaseLock(String lockKey, String lockValue) {27 String key = LOCK_PREFIX + lockKey;28 Long result = redisTemplate.execute(29 new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class),30 Collections.singletonList(key),31 lockValue32 );33 return Long.valueOf(1).equals(result);34 }35 36 /**37 * 可重入锁实现38 */39 private static final String REENTRANT_LOCK_SCRIPT = 40 "local key = KEYS[1] " +41 "local value = ARGV[1] " +42 "local expire = ARGV[2] " +43 "local current = redis.call('HGET', key, 'owner') " +44 "if current == false then " +45 " redis.call('HSET', key, 'owner', value) " +46 " redis.call('HSET', key, 'count', 1) " +47 " redis.call('EXPIRE', key, expire) " +48 " return 1 " +49 "elseif current == value then " +50 " local count = redis.call('HINCRBY', key, 'count', 1) " +51 " redis.call('EXPIRE', key, expire) " +52 " return 1 " +53 "else " +54 " return 0 " +55 "end";56 57 private static final String REENTRANT_UNLOCK_SCRIPT = 58 "local key = KEYS[1] " +59 "local value = ARGV[1] " +60 "local current = redis.call('HGET', key, 'owner') " +61 "if current == value then " +62 " local count = redis.call('HINCRBY', key, 'count', -1) " +63 " if count == 0 then " +64 " redis.call('DEL', key) " +65 " return 1 " +66 " else " +67 " return 1 " +68 " end " +69 "else " +70 " return 0 " +71 "end";72 73 /**74 * 可重入锁 - 获取锁75 */76 public boolean tryReentrantLock(String lockKey, String lockValue, long expireTime) {77 String key = LOCK_PREFIX + lockKey;78 Long result = redisTemplate.execute(79 new DefaultRedisScript<>(REENTRANT_LOCK_SCRIPT, Long.class),80 Collections.singletonList(key),81 lockValue,82 String.valueOf(expireTime)83 );84 return Long.valueOf(1).equals(result);85 }86 87 /**88 * 可重入锁 - 释放锁89 */90 public boolean releaseReentrantLock(String lockKey, String lockValue) {91 String key = LOCK_PREFIX + lockKey;92 Long result = redisTemplate.execute(93 new DefaultRedisScript<>(REENTRANT_UNLOCK_SCRIPT, Long.class),94 Collections.singletonList(key),95 lockValue96 );97 return Long.valueOf(1).equals(result);98 }99}100101// 分布式锁注解102@Component103@Aspect104public class DistributedLockAspect {105 106 @Autowired107 private RedisDistributedLock distributedLock;108 109 @Around("@annotation(lockAnnotation)")110 public Object around(ProceedingJoinPoint joinPoint, DistributedLock lockAnnotation) throws Throwable {111 String lockKey = generateLockKey(joinPoint, lockAnnotation.key());112 String lockValue = UUID.randomUUID().toString();113 114 boolean acquired = distributedLock.tryLock(115 lockKey, 116 lockValue, 117 lockAnnotation.expireTime(), 118 lockAnnotation.timeUnit()119 );120 121 if (!acquired) {122 if (lockAnnotation.waitTime() > 0) {123 // 等待获取锁124 return waitAndRetry(joinPoint, lockAnnotation, lockKey, lockValue);125 } else {126 throw new RuntimeException("获取分布式锁失败: " + lockKey);127 }128 }129 130 try {131 return joinPoint.proceed();132 } finally {133 distributedLock.releaseLock(lockKey, lockValue);134 }135 }136 137 private Object waitAndRetry(ProceedingJoinPoint joinPoint, DistributedLock lockAnnotation, 138 String lockKey, String lockValue) throws Throwable {139 long waitTime = lockAnnotation.waitTime();140 long startTime = System.currentTimeMillis();141 142 while (System.currentTimeMillis() - startTime < waitTime) {143 boolean acquired = distributedLock.tryLock(144 lockKey, 145 lockValue, 146 lockAnnotation.expireTime(), 147 lockAnnotation.timeUnit()148 );149 150 if (acquired) {151 try {152 return joinPoint.proceed();153 } finally {154 distributedLock.releaseLock(lockKey, lockValue);155 }156 }157 158 try {159 Thread.sleep(100); // 等待100ms后重试160 } catch (InterruptedException e) {161 Thread.currentThread().interrupt();162 throw new RuntimeException("等待锁被中断", e);163 }164 }165 166 throw new RuntimeException("等待分布式锁超时: " + lockKey);167 }168 169 private String generateLockKey(ProceedingJoinPoint joinPoint, String keyExpression) {170 if (keyExpression.isEmpty()) {171 return joinPoint.getSignature().toShortString();172 }173 174 // 简化的SpEL表达式解析175 Object[] args = joinPoint.getArgs();176 return keyExpression.replace("#args[0]", String.valueOf(args[0]));177 }178}179180// 分布式锁注解定义181@Target(ElementType.METHOD)182@Retention(RetentionPolicy.RUNTIME)183public @interface DistributedLock {184 String key() default "";185 long expireTime() default 30;186 TimeUnit timeUnit() default TimeUnit.SECONDS;187 long waitTime() default 0;188}189190// 使用示例191@Service192public class OrderService {193 194 @DistributedLock(key = "order:create:#args[0]", expireTime = 10, waitTime = 5000)195 public void createOrder(String userId, OrderRequest request) {196 // 创建订单的业务逻辑197 // 同一用户同时只能创建一个订单198 }199 200 @DistributedLock(key = "inventory:reduce:#args[0]", expireTime = 30)201 public void reduceInventory(String productId, int quantity) {202 // 减库存的业务逻辑203 // 同一商品的库存操作需要串行化204 }205}6.3 限流器实现
限流是保护系统稳定性的重要手段,Redis提供了多种限流算法的实现方案。
- 限流算法
- 限流算法对比
1@Component2public class RedisRateLimiter {3 4 @Autowired5 private RedisTemplate<String, String> redisTemplate;6 7 /**8 * 固定窗口限流9 */10 private static final String FIXED_WINDOW_SCRIPT = 11 "local key = KEYS[1] " +12 "local window = tonumber(ARGV[1]) " +13 "local limit = tonumber(ARGV[2]) " +14 "local current = redis.call('INCR', key) " +15 "if current == 1 then " +16 " redis.call('EXPIRE', key, window) " +17 "end " +18 "if current <= limit then " +19 " return {1, current, limit - current} " +20 "else " +21 " return {0, current, 0} " +22 "end";23 24 public RateLimitResult fixedWindowLimit(String key, int windowSeconds, int limit) {25 List<Long> result = redisTemplate.execute(26 new DefaultRedisScript<>(FIXED_WINDOW_SCRIPT, List.class),27 Collections.singletonList("rate_limit:fixed:" + key),28 String.valueOf(windowSeconds),29 String.valueOf(limit)30 );31 32 return new RateLimitResult(33 result.get(0) == 1,34 result.get(1).intValue(),35 result.get(2).intValue()36 );37 }38 39 /**40 * 滑动窗口限流41 */42 private static final String SLIDING_WINDOW_SCRIPT = 43 "local key = KEYS[1] " +44 "local window = tonumber(ARGV[1]) " +45 "local limit = tonumber(ARGV[2]) " +46 "local now = tonumber(ARGV[3]) " +47 "local clearBefore = now - window * 1000 " +48 "redis.call('ZREMRANGEBYSCORE', key, 0, clearBefore) " +49 "local current = redis.call('ZCARD', key) " +50 "if current < limit then " +51 " redis.call('ZADD', key, now, now) " +52 " redis.call('EXPIRE', key, window) " +53 " return {1, current + 1, limit - current - 1} " +54 "else " +55 " return {0, current, 0} " +56 "end";57 58 public RateLimitResult slidingWindowLimit(String key, int windowSeconds, int limit) {59 long now = System.currentTimeMillis();60 List<Long> result = redisTemplate.execute(61 new DefaultRedisScript<>(SLIDING_WINDOW_SCRIPT, List.class),62 Collections.singletonList("rate_limit:sliding:" + key),63 String.valueOf(windowSeconds),64 String.valueOf(limit),65 String.valueOf(now)66 );67 68 return new RateLimitResult(69 result.get(0) == 1,70 result.get(1).intValue(),71 result.get(2).intValue()72 );73 }74 75 /**76 * 令牌桶限流77 */78 private static final String TOKEN_BUCKET_SCRIPT = 79 "local key = KEYS[1] " +80 "local capacity = tonumber(ARGV[1]) " +81 "local tokens = tonumber(ARGV[2]) " +82 "local interval = tonumber(ARGV[3]) " +83 "local requested = tonumber(ARGV[4]) " +84 "local now = tonumber(ARGV[5]) " +85 86 "local bucket = redis.call('HMGET', key, 'tokens', 'last_refill') " +87 "local current_tokens = tonumber(bucket[1]) or capacity " +88 "local last_refill = tonumber(bucket[2]) or now " +89 90 "local elapsed = now - last_refill " +91 "local tokens_to_add = math.floor(elapsed / interval * tokens) " +92 "current_tokens = math.min(capacity, current_tokens + tokens_to_add) " +93 94 "if current_tokens >= requested then " +95 " current_tokens = current_tokens - requested " +96 " redis.call('HMSET', key, 'tokens', current_tokens, 'last_refill', now) " +97 " redis.call('EXPIRE', key, 3600) " +98 " return {1, current_tokens, requested} " +99 "else " +100 " redis.call('HMSET', key, 'tokens', current_tokens, 'last_refill', now) " +101 " redis.call('EXPIRE', key, 3600) " +102 " return {0, current_tokens, 0} " +103 "end";104 105 public RateLimitResult tokenBucketLimit(String key, int capacity, int tokensPerSecond, int requested) {106 long now = System.currentTimeMillis();107 List<Long> result = redisTemplate.execute(108 new DefaultRedisScript<>(TOKEN_BUCKET_SCRIPT, List.class),109 Collections.singletonList("rate_limit:token:" + key),110 String.valueOf(capacity),111 String.valueOf(tokensPerSecond),112 String.valueOf(1000), // 1秒的毫秒数113 String.valueOf(requested),114 String.valueOf(now)115 );116 117 return new RateLimitResult(118 result.get(0) == 1,119 result.get(1).intValue(),120 result.get(2).intValue()121 );122 }123 124 /**125 * 漏桶限流126 */127 private static final String LEAKY_BUCKET_SCRIPT = 128 "local key = KEYS[1] " +129 "local capacity = tonumber(ARGV[1]) " +130 "local leak_rate = tonumber(ARGV[2]) " +131 "local requested = tonumber(ARGV[3]) " +132 "local now = tonumber(ARGV[4]) " +133 134 "local bucket = redis.call('HMGET', key, 'volume', 'last_leak') " +135 "local current_volume = tonumber(bucket[1]) or 0 " +136 "local last_leak = tonumber(bucket[2]) or now " +137 138 "local elapsed = now - last_leak " +139 "local leaked = math.floor(elapsed / 1000 * leak_rate) " +140 "current_volume = math.max(0, current_volume - leaked) " +141 142 "if current_volume + requested <= capacity then " +143 " current_volume = current_volume + requested " +144 " redis.call('HMSET', key, 'volume', current_volume, 'last_leak', now) " +145 " redis.call('EXPIRE', key, 3600) " +146 " return {1, current_volume, capacity - current_volume} " +147 "else " +148 " redis.call('HMSET', key, 'volume', current_volume, 'last_leak', now) " +149 " redis.call('EXPIRE', key, 3600) " +150 " return {0, current_volume, capacity - current_volume} " +151 "end";152 153 public RateLimitResult leakyBucketLimit(String key, int capacity, int leakRate, int requested) {154 long now = System.currentTimeMillis();155 List<Long> result = redisTemplate.execute(156 new DefaultRedisScript<>(LEAKY_BUCKET_SCRIPT, List.class),157 Collections.singletonList("rate_limit:leaky:" + key),158 String.valueOf(capacity),159 String.valueOf(leakRate),160 String.valueOf(requested),161 String.valueOf(now)162 );163 164 return new RateLimitResult(165 result.get(0) == 1,166 result.get(1).intValue(),167 result.get(2).intValue()168 );169 }170 171 // 限流结果类172 public static class RateLimitResult {173 private final boolean allowed;174 private final int current;175 private final int remaining;176 177 public RateLimitResult(boolean allowed, int current, int remaining) {178 this.allowed = allowed;179 this.current = current;180 this.remaining = remaining;181 }182 183 // getters184 public boolean isAllowed() { return allowed; }185 public int getCurrent() { return current; }186 public int getRemaining() { return remaining; }187 }188}189190// 限流注解实现191@Component192@Aspect193public class RateLimitAspect {194 195 @Autowired196 private RedisRateLimiter rateLimiter;197 198 @Around("@annotation(rateLimit)")199 public Object around(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {200 String key = generateKey(joinPoint, rateLimit);201 202 RedisRateLimiter.RateLimitResult result;203 switch (rateLimit.algorithm()) {204 case FIXED_WINDOW:205 result = rateLimiter.fixedWindowLimit(key, rateLimit.window(), rateLimit.limit());206 break;207 case SLIDING_WINDOW:208 result = rateLimiter.slidingWindowLimit(key, rateLimit.window(), rateLimit.limit());209 break;210 case TOKEN_BUCKET:211 result = rateLimiter.tokenBucketLimit(key, rateLimit.limit(), rateLimit.tokensPerSecond(), 1);212 break;213 case LEAKY_BUCKET:214 result = rateLimiter.leakyBucketLimit(key, rateLimit.limit(), rateLimit.leakRate(), 1);215 break;216 default:217 result = rateLimiter.fixedWindowLimit(key, rateLimit.window(), rateLimit.limit());218 }219 220 if (!result.isAllowed()) {221 throw new RateLimitExceededException("请求频率过高,请稍后重试");222 }223 224 // 在响应头中添加限流信息225 HttpServletResponse response = getCurrentResponse();226 if (response != null) {227 response.setHeader("X-RateLimit-Limit", String.valueOf(rateLimit.limit()));228 response.setHeader("X-RateLimit-Remaining", String.valueOf(result.getRemaining()));229 response.setHeader("X-RateLimit-Reset", String.valueOf(System.currentTimeMillis() + rateLimit.window() * 1000));230 }231 232 return joinPoint.proceed();233 }234 235 private String generateKey(ProceedingJoinPoint joinPoint, RateLimit rateLimit) {236 String baseKey = rateLimit.key();237 if (baseKey.isEmpty()) {238 baseKey = joinPoint.getSignature().toShortString();239 }240 241 // 根据限流维度生成key242 switch (rateLimit.dimension()) {243 case IP:244 return baseKey + ":" + getCurrentClientIp();245 case USER:246 return baseKey + ":" + getCurrentUserId();247 case GLOBAL:248 return baseKey;249 default:250 return baseKey;251 }252 }253 254 private String getCurrentClientIp() {255 HttpServletRequest request = getCurrentRequest();256 if (request != null) {257 String xForwardedFor = request.getHeader("X-Forwarded-For");258 if (xForwardedFor != null && !xForwardedFor.isEmpty()) {259 return xForwardedFor.split(",")[0].trim();260 }261 return request.getRemoteAddr();262 }263 return "unknown";264 }265 266 private String getCurrentUserId() {267 // 从安全上下文获取用户ID268 return "user123"; // 简化实现269 }270 271 private HttpServletRequest getCurrentRequest() {272 RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();273 if (requestAttributes instanceof ServletRequestAttributes) {274 return ((ServletRequestAttributes) requestAttributes).getRequest();275 }276 return null;277 }278 279 private HttpServletResponse getCurrentResponse() {280 RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();281 if (requestAttributes instanceof ServletRequestAttributes) {282 return ((ServletRequestAttributes) requestAttributes).getResponse();283 }284 return null;285 }286}287288// 限流注解定义289@Target(ElementType.METHOD)290@Retention(RetentionPolicy.RUNTIME)291public @interface RateLimit {292 String key() default "";293 int limit() default 100;294 int window() default 60;295 RateLimitAlgorithm algorithm() default RateLimitAlgorithm.FIXED_WINDOW;296 RateLimitDimension dimension() default RateLimitDimension.IP;297 int tokensPerSecond() default 10;298 int leakRate() default 10;299}300301// 限流算法枚举302public enum RateLimitAlgorithm {303 FIXED_WINDOW, // 固定窗口304 SLIDING_WINDOW, // 滑动窗口305 TOKEN_BUCKET, // 令牌桶306 LEAKY_BUCKET // 漏桶307}308309// 限流维度枚举310public enum RateLimitDimension {311 IP, // 按IP限流312 USER, // 按用户限流313 GLOBAL // 全局限流314}315316// 限流异常317public class RateLimitExceededException extends RuntimeException {318 public RateLimitExceededException(String message) {319 super(message);320 }321}322323// 使用示例324@RestController325public class ApiController {326 327 @GetMapping("/api/data")328 @RateLimit(key = "api:data", limit = 100, window = 60, algorithm = RateLimitAlgorithm.SLIDING_WINDOW)329 public ResponseEntity<String> getData() {330 return ResponseEntity.ok("data");331 }332 333 @PostMapping("/api/upload")334 @RateLimit(key = "api:upload", limit = 10, window = 60, dimension = RateLimitDimension.USER)335 public ResponseEntity<String> uploadFile() {336 return ResponseEntity.ok("uploaded");337 }338 339 @GetMapping("/api/search")340 @RateLimit(key = "api:search", limit = 1000, tokensPerSecond = 50, algorithm = RateLimitAlgorithm.TOKEN_BUCKET)341 public ResponseEntity<String> search() {342 return ResponseEntity.ok("search results");343 }344}1@Component2public class RateLimitPerformanceTest {3 4 @Autowired5 private RedisRateLimiter rateLimiter;6 7 /**8 * 限流算法性能测试9 */10 public void performanceTest() {11 int testCount = 10000;12 String testKey = "performance_test";13 14 // 固定窗口测试15 long start1 = System.currentTimeMillis();16 for (int i = 0; i < testCount; i++) {17 rateLimiter.fixedWindowLimit(testKey + "_fixed", 60, 1000);18 }19 long time1 = System.currentTimeMillis() - start1;20 21 // 滑动窗口测试22 long start2 = System.currentTimeMillis();23 for (int i = 0; i < testCount; i++) {24 rateLimiter.slidingWindowLimit(testKey + "_sliding", 60, 1000);25 }26 long time2 = System.currentTimeMillis() - start2;27 28 // 令牌桶测试29 long start3 = System.currentTimeMillis();30 for (int i = 0; i < testCount; i++) {31 rateLimiter.tokenBucketLimit(testKey + "_token", 1000, 100, 1);32 }33 long time3 = System.currentTimeMillis() - start3;34 35 // 漏桶测试36 long start4 = System.currentTimeMillis();37 for (int i = 0; i < testCount; i++) {38 rateLimiter.leakyBucketLimit(testKey + "_leaky", 1000, 100, 1);39 }40 long time4 = System.currentTimeMillis() - start4;41 42 System.out.println("限流算法性能测试结果(" + testCount + "次请求):");43 System.out.println("固定窗口: " + time1 + "ms");44 System.out.println("滑动窗口: " + time2 + "ms");45 System.out.println("令牌桶: " + time3 + "ms");46 System.out.println("漏桶: " + time4 + "ms");47 }48}| 算法 | 优点 | 缺点 | 适用场景 | 性能 |
|---|---|---|---|---|
| 固定窗口 | 实现简单,性能最高 | 边界突刺问题 | 粗粒度限流 | ⭐⭐⭐⭐⭐ |
| 滑动窗口 | 平滑限流,精确控制 | 内存占用较高 | 精确限流需求 | ⭐⭐⭐ |
| 令牌桶 | 允许突发流量 | 实现复杂 | 需要处理突发请求 | ⭐⭐⭐⭐ |
| 漏桶 | 流量整形,平滑输出 | 不允许突发 | 需要平滑流量 | ⭐⭐⭐⭐ |
6.4 排行榜系统
基于Redis Sorted Set实现的排行榜系统,支持实时更新和多维度排序。
- 排行榜实现
- 排行榜优化
1@Service2public class LeaderboardService {3 4 @Autowired5 private RedisTemplate<String, Object> redisTemplate;6 7 private static final String LEADERBOARD_PREFIX = "leaderboard:";8 9 /**10 * 更新用户分数11 */12 public void updateScore(String leaderboardName, String userId, double score) {13 String key = LEADERBOARD_PREFIX + leaderboardName;14 redisTemplate.opsForZSet().add(key, userId, score);15 16 // 设置过期时间(可选)17 redisTemplate.expire(key, 7, TimeUnit.DAYS);18 }19 20 /**21 * 增加用户分数22 */23 public Double incrementScore(String leaderboardName, String userId, double increment) {24 String key = LEADERBOARD_PREFIX + leaderboardName;25 return redisTemplate.opsForZSet().incrementScore(key, userId, increment);26 }27 28 /**29 * 获取用户分数30 */31 public Double getUserScore(String leaderboardName, String userId) {32 String key = LEADERBOARD_PREFIX + leaderboardName;33 return redisTemplate.opsForZSet().score(key, userId);34 }35 36 /**37 * 获取用户排名(从1开始)38 */39 public Long getUserRank(String leaderboardName, String userId) {40 String key = LEADERBOARD_PREFIX + leaderboardName;41 Long rank = redisTemplate.opsForZSet().reverseRank(key, userId);42 return rank != null ? rank + 1 : null;43 }44 45 /**46 * 获取排行榜前N名47 */48 public List<LeaderboardEntry> getTopN(String leaderboardName, int n) {49 String key = LEADERBOARD_PREFIX + leaderboardName;50 Set<ZSetOperations.TypedTuple<Object>> tuples = 51 redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, n - 1);52 53 List<LeaderboardEntry> result = new ArrayList<>();54 int rank = 1;55 for (ZSetOperations.TypedTuple<Object> tuple : tuples) {56 result.add(new LeaderboardEntry(57 rank++,58 (String) tuple.getValue(),59 tuple.getScore()60 ));61 }62 63 return result;64 }65 66 /**67 * 获取指定排名范围的用户68 */69 public List<LeaderboardEntry> getRangeByRank(String leaderboardName, long start, long end) {70 String key = LEADERBOARD_PREFIX + leaderboardName;71 Set<ZSetOperations.TypedTuple<Object>> tuples = 72 redisTemplate.opsForZSet().reverseRangeWithScores(key, start - 1, end - 1);73 74 List<LeaderboardEntry> result = new ArrayList<>();75 long rank = start;76 for (ZSetOperations.TypedTuple<Object> tuple : tuples) {77 result.add(new LeaderboardEntry(78 rank++,79 (String) tuple.getValue(),80 tuple.getScore()81 ));82 }83 84 return result;85 }86 87 /**88 * 获取指定分数范围的用户89 */90 public List<LeaderboardEntry> getRangeByScore(String leaderboardName, double minScore, double maxScore) {91 String key = LEADERBOARD_PREFIX + leaderboardName;92 Set<ZSetOperations.TypedTuple<Object>> tuples = 93 redisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, minScore, maxScore);94 95 List<LeaderboardEntry> result = new ArrayList<>();96 for (ZSetOperations.TypedTuple<Object> tuple : tuples) {97 Long rank = redisTemplate.opsForZSet().reverseRank(key, tuple.getValue());98 result.add(new LeaderboardEntry(99 rank != null ? rank + 1 : 0,100 (String) tuple.getValue(),101 tuple.getScore()102 ));103 }104 105 return result;106 }107 108 /**109 * 获取用户周围的排名110 */111 public List<LeaderboardEntry> getUserNeighbors(String leaderboardName, String userId, int count) {112 Long userRank = getUserRank(leaderboardName, userId);113 if (userRank == null) {114 return Collections.emptyList();115 }116 117 long start = Math.max(1, userRank - count / 2);118 long end = start + count - 1;119 120 return getRangeByRank(leaderboardName, start, end);121 }122 123 /**124 * 删除用户125 */126 public void removeUser(String leaderboardName, String userId) {127 String key = LEADERBOARD_PREFIX + leaderboardName;128 redisTemplate.opsForZSet().remove(key, userId);129 }130 131 /**132 * 获取排行榜总人数133 */134 public Long getTotalCount(String leaderboardName) {135 String key = LEADERBOARD_PREFIX + leaderboardName;136 return redisTemplate.opsForZSet().zCard(key);137 }138 139 /**140 * 批量更新分数141 */142 public void batchUpdateScores(String leaderboardName, Map<String, Double> userScores) {143 String key = LEADERBOARD_PREFIX + leaderboardName;144 145 Set<ZSetOperations.TypedTuple<Object>> tuples = new HashSet<>();146 for (Map.Entry<String, Double> entry : userScores.entrySet()) {147 tuples.add(new DefaultTypedTuple<>(entry.getKey(), entry.getValue()));148 }149 150 redisTemplate.opsForZSet().add(key, tuples);151 }152 153 /**154 * 多维度排行榜合并155 */156 public void mergeLeaderboards(String targetLeaderboard, List<String> sourceLeaderboards, List<Double> weights) {157 String targetKey = LEADERBOARD_PREFIX + targetLeaderboard;158 159 List<String> sourceKeys = sourceLeaderboards.stream()160 .map(name -> LEADERBOARD_PREFIX + name)161 .collect(Collectors.toList());162 163 // 使用ZUNIONSTORE合并多个排行榜164 ZSetOperations.Aggregate aggregate = ZSetOperations.Aggregate.SUM;165 redisTemplate.opsForZSet().unionAndStore(sourceKeys.get(0), sourceKeys.subList(1, sourceKeys.size()), targetKey, aggregate, weights.toArray(new Double[0]));166 167 // 设置过期时间168 redisTemplate.expire(targetKey, 1, TimeUnit.HOURS);169 }170 171 // 排行榜条目类172 public static class LeaderboardEntry {173 private final long rank;174 private final String userId;175 private final double score;176 177 public LeaderboardEntry(long rank, String userId, double score) {178 this.rank = rank;179 this.userId = userId;180 this.score = score;181 }182 183 // getters184 public long getRank() { return rank; }185 public String getUserId() { return userId; }186 public double getScore() { return score; }187 }188}189190// 排行榜管理器191@Service192public class LeaderboardManager {193 194 @Autowired195 private LeaderboardService leaderboardService;196 197 /**198 * 游戏排行榜示例199 */200 public void gameLeaderboardExample() {201 String leaderboard = "game:global";202 203 // 更新玩家分数204 leaderboardService.updateScore(leaderboard, "player1", 1500);205 leaderboardService.updateScore(leaderboard, "player2", 1200);206 leaderboardService.updateScore(leaderboard, "player3", 1800);207 208 // 获取前10名209 List<LeaderboardService.LeaderboardEntry> top10 = 210 leaderboardService.getTopN(leaderboard, 10);211 212 System.out.println("游戏排行榜前10名:");213 for (LeaderboardService.LeaderboardEntry entry : top10) {214 System.out.printf("第%d名: %s (分数: %.0f)\n", 215 entry.getRank(), entry.getUserId(), entry.getScore());216 }217 }218 219 /**220 * 实时活动排行榜221 */222 public void activityLeaderboardExample() {223 String leaderboard = "activity:2025_spring";224 225 // 用户完成任务,增加积分226 leaderboardService.incrementScore(leaderboard, "user1", 100);227 leaderboardService.incrementScore(leaderboard, "user2", 150);228 leaderboardService.incrementScore(leaderboard, "user3", 80);229 230 // 获取用户排名和周围用户231 String userId = "user1";232 Long userRank = leaderboardService.getUserRank(leaderboard, userId);233 List<LeaderboardService.LeaderboardEntry> neighbors = 234 leaderboardService.getUserNeighbors(leaderboard, userId, 5);235 236 System.out.println("用户" + userId + "当前排名: " + userRank);237 System.out.println("周围用户排名:");238 for (LeaderboardService.LeaderboardEntry entry : neighbors) {239 System.out.printf("第%d名: %s (积分: %.0f)\n", 240 entry.getRank(), entry.getUserId(), entry.getScore());241 }242 }243 244 /**245 * 多维度排行榜示例246 */247 public void multiDimensionLeaderboardExample() {248 // 创建不同维度的排行榜249 String scoreBoard = "game:score";250 String timeBoard = "game:time";251 String comboBoard = "game:combo";252 253 // 更新各维度数据254 leaderboardService.updateScore(scoreBoard, "player1", 1500);255 leaderboardService.updateScore(timeBoard, "player1", 120); // 用时120秒256 leaderboardService.updateScore(comboBoard, "player1", 50); // 连击50次257 258 // 合并排行榜(综合排名)259 List<String> sourceBoards = Arrays.asList(scoreBoard, timeBoard, comboBoard);260 List<Double> weights = Arrays.asList(0.5, -0.3, 0.2); // 分数权重0.5,时间权重-0.3(越少越好),连击权重0.2261 262 leaderboardService.mergeLeaderboards("game:comprehensive", sourceBoards, weights);263 264 // 获取综合排行榜265 List<LeaderboardService.LeaderboardEntry> comprehensive = 266 leaderboardService.getTopN("game:comprehensive", 10);267 268 System.out.println("综合排行榜:");269 for (LeaderboardService.LeaderboardEntry entry : comprehensive) {270 System.out.printf("第%d名: %s (综合分: %.2f)\n", 271 entry.getRank(), entry.getUserId(), entry.getScore());272 }273 }274}1@Service2public class OptimizedLeaderboardService {3 4 @Autowired5 private RedisTemplate<String, Object> redisTemplate;6 7 /**8 * 分页排行榜 - 避免大量数据传输9 */10 public PageResult<LeaderboardEntry> getLeaderboardPage(String leaderboardName, int page, int size) {11 String key = LEADERBOARD_PREFIX + leaderboardName;12 13 long total = redisTemplate.opsForZSet().zCard(key);14 long start = (page - 1) * size;15 long end = start + size - 1;16 17 Set<ZSetOperations.TypedTuple<Object>> tuples = 18 redisTemplate.opsForZSet().reverseRangeWithScores(key, start, end);19 20 List<LeaderboardEntry> entries = new ArrayList<>();21 long rank = start + 1;22 for (ZSetOperations.TypedTuple<Object> tuple : tuples) {23 entries.add(new LeaderboardEntry(24 rank++,25 (String) tuple.getValue(),26 tuple.getScore()27 ));28 }29 30 return new PageResult<>(entries, page, size, total);31 }32 33 /**34 * 缓存用户排名 - 减少重复计算35 */36 private final Cache<String, Long> rankCache = Caffeine.newBuilder()37 .maximumSize(10000)38 .expireAfterWrite(5, TimeUnit.MINUTES)39 .build();40 41 public Long getCachedUserRank(String leaderboardName, String userId) {42 String cacheKey = leaderboardName + ":" + userId;43 44 return rankCache.get(cacheKey, key -> {45 String redisKey = LEADERBOARD_PREFIX + leaderboardName;46 Long rank = redisTemplate.opsForZSet().reverseRank(redisKey, userId);47 return rank != null ? rank + 1 : null;48 });49 }50 51 /**52 * 批量获取用户排名53 */54 public Map<String, Long> batchGetUserRanks(String leaderboardName, List<String> userIds) {55 String key = LEADERBOARD_PREFIX + leaderboardName;56 Map<String, Long> result = new HashMap<>();57 58 // 使用Pipeline批量获取59 List<Object> ranks = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {60 for (String userId : userIds) {61 connection.zRevRank(key.getBytes(), userId.getBytes());62 }63 return null;64 });65 66 for (int i = 0; i < userIds.size(); i++) {67 Long rank = (Long) ranks.get(i);68 result.put(userIds.get(i), rank != null ? rank + 1 : null);69 }70 71 return result;72 }73 74 /**75 * 定时排行榜快照76 */77 @Scheduled(cron = "0 0 * * * ?") // 每小时执行一次78 public void createLeaderboardSnapshot() {79 String sourceKey = LEADERBOARD_PREFIX + "game:realtime";80 String snapshotKey = LEADERBOARD_PREFIX + "game:snapshot:" + 81 LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));82 83 // 复制排行榜数据84 redisTemplate.opsForZSet().unionAndStore(sourceKey, Collections.emptyList(), snapshotKey);85 86 // 设置快照过期时间(保留7天)87 redisTemplate.expire(snapshotKey, 7, TimeUnit.DAYS);88 }89 90 /**91 * 排行榜数据清理92 */93 public void cleanupLeaderboard(String leaderboardName, int keepTopN) {94 String key = LEADERBOARD_PREFIX + leaderboardName;95 96 // 只保留前N名97 redisTemplate.opsForZSet().removeRange(key, 0, -keepTopN - 1);98 }99 100 /**101 * 排行榜统计信息102 */103 public LeaderboardStats getLeaderboardStats(String leaderboardName) {104 String key = LEADERBOARD_PREFIX + leaderboardName;105 106 Long totalUsers = redisTemplate.opsForZSet().zCard(key);107 108 // 获取最高分和最低分109 Set<ZSetOperations.TypedTuple<Object>> highest = 110 redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, 0);111 Set<ZSetOperations.TypedTuple<Object>> lowest = 112 redisTemplate.opsForZSet().rangeWithScores(key, 0, 0);113 114 Double maxScore = highest.isEmpty() ? 0.0 : highest.iterator().next().getScore();115 Double minScore = lowest.isEmpty() ? 0.0 : lowest.iterator().next().getScore();116 117 // 计算平均分(简化实现)118 Double avgScore = (maxScore + minScore) / 2;119 120 return new LeaderboardStats(totalUsers, maxScore, minScore, avgScore);121 }122 123 // 分页结果类124 public static class PageResult<T> {125 private final List<T> data;126 private final int page;127 private final int size;128 private final long total;129 private final int totalPages;130 131 public PageResult(List<T> data, int page, int size, long total) {132 this.data = data;133 this.page = page;134 this.size = size;135 this.total = total;136 this.totalPages = (int) Math.ceil((double) total / size);137 }138 139 // getters140 public List<T> getData() { return data; }141 public int getPage() { return page; }142 public int getSize() { return size; }143 public long getTotal() { return total; }144 public int getTotalPages() { return totalPages; }145 }146 147 // 排行榜统计类148 public static class LeaderboardStats {149 private final long totalUsers;150 private final double maxScore;151 private final double minScore;152 private final double avgScore;153 154 public LeaderboardStats(long totalUsers, double maxScore, double minScore, double avgScore) {155 this.totalUsers = totalUsers;156 this.maxScore = maxScore;157 this.minScore = minScore;158 this.avgScore = avgScore;159 }160 161 // getters162 public long getTotalUsers() { return totalUsers; }163 public double getMaxScore() { return maxScore; }164 public double getMinScore() { return minScore; }165 public double getAvgScore() { return avgScore; }166 }167}7. Redis面试题精选
7.1 基础概念题
Q1: Redis为什么这么快?
A: Redis高性能的原因包括:
- 内存操作:数据存储在内存中,避免磁盘I/O
- 单线程模型:避免线程切换和锁竞争开销
- 高效数据结构:针对不同场景优化的数据结构
- 非阻塞I/O:使用epoll等高效I/O多路复用
- 优化的网络协议:简单高效的RESP协议
Q2: Redis的数据类型有哪些?分别适用于什么场景?
A: Redis支持以下数据类型:
- String:缓存、计数器、分布式锁
- Hash:对象存储、购物车
- List:消息队列、时间线、最新列表
- Set:标签系统、好友关系、去重
- Sorted Set:排行榜、优先级队列、范围查询
- Stream:消息流、事件溯源
Q3: Redis持久化方式有哪些?各有什么优缺点?
A: Redis支持三种持久化方式:
| 方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| RDB | 文件小、恢复快、性能影响小 | 可能丢失数据、fork耗时 | 备份、主从同步 |
| AOF | 数据安全、可读性强 | 文件大、恢复慢 | 数据安全要求高 |
| 混合持久化 | 兼顾性能和安全 | 复杂度较高 | 生产环境推荐 |
7.2 架构设计题
Q4: Redis如何实现高可用?
A: Redis高可用方案:
- 主从复制:数据备份和读写分离
- 哨兵模式:自动故障检测和转移
- 集群模式:数据分片和水平扩展
- 客户端容错:连接池、重试机制
Q5: Redis集群的数据分片原理是什么?
A: Redis集群使用一致性哈希的变种:
- 16384个槽位(slot)
- CRC16(key) % 16384 计算槽位
- 每个主节点负责一部分槽位
- 使用Hash Tag确保相关key在同一槽位
Q6: 如何解决Redis缓存穿透、击穿、雪崩问题?
A: 解决方案:
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 缓存穿透 | 查询不存在的数据 | 布隆过滤器、缓存空值 |
| 缓存击穿 | 热点key过期 | 分布式锁、永不过期 |
| 缓存雪崩 | 大量key同时过期 | 过期时间随机化、多级缓存 |
7.3 性能优化题
Q7: 如何优化Redis性能?
A: 性能优化策略:
- 内存优化:合适的数据结构、内存淘汰策略
- 网络优化:连接池、Pipeline、批量操作
- 持久化优化:合理配置RDB和AOF参数
- 架构优化:读写分离、分片、多级缓存
Q8: Redis的内存淘汰策略有哪些?
A: Redis支持8种内存淘汰策略:
- noeviction:不淘汰,写入报错
- allkeys-lru/lfu:从所有key中淘汰
- volatile-lru/lfu:从过期key中淘汰
- allkeys-random:随机淘汰所有key
- volatile-random:随机淘汰过期key
- volatile-ttl:淘汰即将过期的key
7.4 实战应用题
Q9: 如何使用Redis实现分布式锁?需要注意什么问题?
A: 分布式锁实现要点:
- 原子性:使用SET NX EX命令
- 唯一性:使用UUID作为锁值
- 过期时间:防止死锁
- 安全释放:Lua脚本确保原子性
- 可重入性:使用Hash结构记录重入次数
Q10: 如何设计一个高性能的Redis缓存系统?
A: 设计要点:
- 缓存策略:Cache-Aside、Write-Through等
- 数据结构选择:根据场景选择合适的数据类型
- 过期策略:合理设置TTL,避免雪崩
- 监控告警:内存使用率、命中率、响应时间
- 容灾备份:主从复制、定期备份
- 理论基础:深入理解Redis的数据结构和实现原理
- 实践应用:多做项目实战,掌握常见应用场景
- 性能调优:学会分析性能瓶颈,掌握优化技巧
- 架构设计:了解高可用方案,能够设计分布式缓存架构
- 持续学习:关注Redis新版本特性,学习最佳实践
通过本章的深入学习,你应该已经全面掌握了Redis的核心概念、数据结构、持久化机制、高可用架构和性能优化技巧。Redis作为现代应用架构中的重要组件,在缓存、会话存储、消息队列、分布式锁等场景中发挥着关键作用。
在实际项目中,合理使用Redis不仅能显著提升系统性能,还能简化架构设计。希望这份详细的Redis指南能够帮助你在技术面试和实际工作中游刃有余!
评论