高并发系统设计
高并发系统设计是构建高性能、高可用系统的核心技术。通过合理的架构设计、缓存策略、异步处理和数据库优化,可以构建出能够处理大量并发请求的系统。
高并发 = 并发模型 + 缓存优化 + 异步处理 + 数据库优化 + 负载均衡
- 🧵 并发模型:高效处理多任务的线程模型设计
- 🚀 缓存优化:减少访问延迟,提高数据读取速度
- ⚡ 异步处理:避免阻塞,提高系统吞吐量
- 📊 数据库优化:分片、索引、读写分离等策略
- ⚖️ 负载均衡:合理分配请求,避免单点过载
1. 并发模型基础
1.1 并发与并行
并发和并行是处理多任务的不同方式:
| 概念 | 说明 | 特点 |
|---|---|---|
| 并发 (Concurrency) | 多个任务交替执行 | 单核CPU上的多任务处理 |
| 并行 (Parallelism) | 多个任务同时执行 | 多核CPU上的多任务处理 |
并发与并行的详细比较
并发 (Concurrency):
- 是一种逻辑概念,指多个任务可以在重叠的时间段内启动、运行和完成
- 一个处理器通过时间片切换来处理多个任务
- 强调任务的调度与切换
- 适用于IO密集型任务
并行 (Parallelism):
- 是一种物理概念,指多个任务在同一时刻同时运行
- 需要多个处理器或多核处理器
- 强调同时执行多个任务
- 适用于计算密集型任务
- 并发示例
- 并行示例
1@RestController2public class ConcurrentController {3 4 @Autowired5 private AsyncTaskExecutor taskExecutor;6 7 @GetMapping("/concurrent")8 public CompletableFuture<String> handleConcurrent() {9 return CompletableFuture.supplyAsync(() -> {10 // 模拟耗时操作11 try {12 Thread.sleep(1000);13 } catch (InterruptedException e) {14 Thread.currentThread().interrupt();15 }16 return "并发处理完成";17 }, taskExecutor);18 }19}1@Service2public class ParallelService {3 4 @Autowired5 private ThreadPoolTaskExecutor executor;6 7 public List<String> processParallel(List<String> items) {8 return items.parallelStream()9 .map(item -> {10 // 并行处理每个项目11 return processItem(item);12 })13 .collect(Collectors.toList());14 }15 16 private String processItem(String item) {17 // 处理单个项目的逻辑18 return "processed_" + item;19 }20}1.2 线程模型
线程池设计
线程池是管理线程生命周期、控制并发的重要工具。合理配置线程池参数对系统性能至关重要。
线程池参数配置指南
| 参数 | 说明 | 配置建议 |
|---|---|---|
| 核心线程数(corePoolSize) | 线程池维护的最小线程数 | IO密集型:CPU核心数 * 2 CPU密集型:CPU核心数 |
| 最大线程数(maxPoolSize) | 线程池允许的最大线程数 | IO密集型:核心线程数 * 3 CPU密集型:核心线程数 + 1 |
| 队列容量(queueCapacity) | 任务队列大小 | 根据内存和任务特性评估,常见100~1000 |
| 拒绝策略(rejectedExecutionHandler) | 队列满时的处理策略 | AbortPolicy:抛异常 CallerRunsPolicy:由调用者线程执行 DiscardPolicy:丢弃任务 DiscardOldestPolicy:丢弃最旧任务 |
- 线程池配置
- 线程池使用
- 线程池监控
1@Configuration2public class ThreadPoolConfig {3 4 @Bean("ioThreadPool")5 public ThreadPoolTaskExecutor ioThreadPool() {6 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();7 executor.setCorePoolSize(20);8 executor.setMaxPoolSize(100);9 executor.setQueueCapacity(500);10 executor.setThreadNamePrefix("IO-");11 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());12 executor.initialize();13 return executor;14 }15 16 @Bean("cpuThreadPool")17 public ThreadPoolTaskExecutor cpuThreadPool() {18 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();19 executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());20 executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);21 executor.setQueueCapacity(1000);22 executor.setThreadNamePrefix("CPU-");23 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());24 executor.initialize();25 return executor;26 }27}1@Service2public class ThreadPoolService {3 4 @Autowired5 @Qualifier("ioThreadPool")6 private ThreadPoolTaskExecutor ioExecutor;7 8 @Autowired9 @Qualifier("cpuThreadPool")10 private ThreadPoolTaskExecutor cpuExecutor;11 12 public CompletableFuture<String> handleIOBoundTask() {13 return CompletableFuture.supplyAsync(() -> {14 // IO密集型任务15 return performIOOperation();16 }, ioExecutor);17 }18 19 public CompletableFuture<String> handleCPUBoundTask() {20 return CompletableFuture.supplyAsync(() -> {21 // CPU密集型任务22 return performCPUOperation();23 }, cpuExecutor);24 }25}1@Component2public class ThreadPoolMonitor {3 4 @Autowired5 @Qualifier("ioThreadPool")6 private ThreadPoolTaskExecutor ioExecutor;7 8 @Autowired9 @Qualifier("cpuThreadPool")10 private ThreadPoolTaskExecutor cpuExecutor;11 12 @Scheduled(fixedRate = 30000) // 每30秒执行一次13 public void monitorThreadPools() {14 logThreadPoolStats("IO线程池", ioExecutor);15 logThreadPoolStats("CPU线程池", cpuExecutor);16 }17 18 private void logThreadPoolStats(String poolName, ThreadPoolTaskExecutor executor) {19 ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();20 21 int corePoolSize = threadPoolExecutor.getCorePoolSize();22 int poolSize = threadPoolExecutor.getPoolSize();23 int activeCount = threadPoolExecutor.getActiveCount();24 long taskCount = threadPoolExecutor.getTaskCount();25 long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();26 int queueSize = threadPoolExecutor.getQueue().size();27 28 log.info("{} 状态 - 核心线程数: {}, 当前线程数: {}, 活跃线程数: {}, " +29 "任务总数: {}, 已完成任务: {}, 队列大小: {}",30 poolName, corePoolSize, poolSize, activeCount,31 taskCount, completedTaskCount, queueSize);32 }33}- 避免无限增长:设置合理的最大线程数,防止资源耗尽
- 任务分类:根据任务类型(IO密集/CPU密集)使用不同线程池
- 动态调整:根据系统负载动态调整线程池参数
- 监控告警:监控线程池状态,及时发现问题
- 优雅关闭:应用关闭时等待线程池任务完成
异步编程模型
异步编程是高并发系统的重要技术,可以有效提高系统吞吐量和资源利用率。
- @Async注解
- CompletableFuture
- 响应式编程
1@Service2public class AsyncService {3 4 @Async("ioThreadPool")5 public CompletableFuture<String> asyncOperation1() {6 // 异步操作17 return CompletableFuture.completedFuture("操作1完成");8 }9 10 @Async("ioThreadPool")11 public CompletableFuture<String> asyncOperation2() {12 // 异步操作213 return CompletableFuture.completedFuture("操作2完成");14 }15 16 public CompletableFuture<List<String>> performAsyncOperations() {17 CompletableFuture<String> future1 = asyncOperation1();18 CompletableFuture<String> future2 = asyncOperation2();19 20 return CompletableFuture.allOf(future1, future2)21 .thenApply(v -> {22 List<String> results = new ArrayList<>();23 results.add(future1.join());24 results.add(future2.join());25 return results;26 });27 }28}1@Service2public class CompletableFutureService {3 4 @Autowired5 private ProductRepository productRepository;6 7 @Autowired8 private ReviewRepository reviewRepository;9 10 @Autowired11 private PriceRepository priceRepository;12 13 @Autowired14 private Executor executor;15 16 public CompletableFuture<ProductDetails> getProductDetails(Long productId) {17 CompletableFuture<Product> productFuture = CompletableFuture18 .supplyAsync(() -> productRepository.findById(productId)19 .orElseThrow(() -> new ProductNotFoundException(productId)), executor);20 21 CompletableFuture<List<Review>> reviewsFuture = CompletableFuture22 .supplyAsync(() -> reviewRepository.findByProductId(productId), executor);23 24 CompletableFuture<Price> priceFuture = CompletableFuture25 .supplyAsync(() -> priceRepository.findByProductId(productId), executor);26 27 return CompletableFuture.allOf(productFuture, reviewsFuture, priceFuture)28 .thenApply(v -> {29 Product product = productFuture.join();30 List<Review> reviews = reviewsFuture.join();31 Price price = priceFuture.join();32 33 return new ProductDetails(product, reviews, price);34 });35 }36}1@Service2public class ReactiveService {3 4 @Autowired5 private ReactiveProductRepository productRepository;6 7 @Autowired8 private ReactiveReviewRepository reviewRepository;9 10 public Mono<ProductDetails> getProductDetails(Long productId) {11 Mono<Product> productMono = productRepository.findById(productId);12 Flux<Review> reviewsFlux = reviewRepository.findByProductId(productId);13 14 return productMono.zipWith(reviewsFlux.collectList())15 .map(tuple -> {16 Product product = tuple.getT1();17 List<Review> reviews = tuple.getT2();18 return new ProductDetails(product, reviews);19 });20 }21}2. 高并发架构设计
2.1 分层架构
分层架构是构建高并发系统的基础,通过将系统分为不同的层次,可以实现更好的扩展性和可维护性。
高并发系统常见的架构层次:
- 负载均衡层:分发请求,防止单点过载
- 应用服务层:处理HTTP请求,权限验证,参数校验
- 业务服务层:实现核心业务逻辑
- 数据访问层:提供统一的数据访问接口
- 数据存储层:实际的数据存储,如数据库、缓存等
- 缓存层:横切各层,提供数据缓存
2.2 微服务架构
微服务架构将系统拆分为多个小型服务,每个服务负责特定业务功能,服务之间通过API通信。这种架构可以实现更好的水平扩展,提高系统整体的并发处理能力。
微服务架构优势与挑战
优势:
- 独立扩展:可以针对不同服务进行独立扩展
- 技术多样性:不同服务可以使用不同的技术栈
- 故障隔离:单个服务故障不会影响整个系统
- 团队自治:小团队负责特定服务,提高开发效率
- 迭代速度:小服务可以快速迭代更新
挑战:
- 分布式复杂性:需要处理网络延迟、分布式事务等
- 服务治理:需要管理服务发现、负载均衡等
- 一致性保证:跨服务事务难以保证强一致性
- 运维复杂度:需要管理多个服务实例
- 监控难度:需要跨服务链路追踪和监控
3. 缓存优化策略
缓存是高并发系统中提升性能的关键技术,合理使用缓存可以大幅减少对后端服务的访问压力。
3.1 多级缓存架构
多级缓存特点
| 缓存级别 | 说明 | 特点 | 适用场景 |
|---|---|---|---|
| L1 本地缓存 | 应用内存中的缓存 | 访问最快、容量有限、与应用同生命周期 | 高频访问、变化少的数据 |
| L2 分布式缓存 | Redis等分布式缓存系统 | 容量大、可靠性高、跨应用共享 | 需要跨实例共享的数据 |
| L3 数据库 | 持久化存储 | 容量最大、访问最慢、最终数据源 | 所有需要持久化的数据 |
- 多级缓存实现
- 缓存监控
1@Service2public class MultiLevelCacheService {3 4 // L1缓存:本地缓存5 private LoadingCache<String, Object> localCache = Caffeine.newBuilder()6 .maximumSize(1000)7 .expireAfterWrite(Duration.ofMinutes(5))8 .build(key -> getFromL2Cache(key));9 10 // L2缓存:Redis缓存11 @Autowired12 private RedisTemplate<String, Object> redisTemplate;13 14 // L3缓存:数据库15 @Autowired16 private UserRepository userRepository;17 18 public Object getData(String key) {19 try {20 // 从L1缓存获取21 return localCache.get(key);22 } catch (Exception e) {23 log.error("L1缓存获取失败", e);24 // 降级到L2缓存25 return getFromL2Cache(key);26 }27 }28 29 private Object getFromL2Cache(String key) {30 try {31 Object value = redisTemplate.opsForValue().get(key);32 if (value == null) {33 // L2缓存未命中,从L3获取34 value = getFromL3Cache(key);35 if (value != null) {36 // 写入L2缓存37 redisTemplate.opsForValue().set(key, value, Duration.ofMinutes(30));38 }39 }40 return value;41 } catch (Exception e) {42 log.error("L2缓存获取失败", e);43 // 降级到L3缓存44 return getFromL3Cache(key);45 }46 }47 48 private Object getFromL3Cache(String key) {49 // 从数据库获取数据50 if (key.startsWith("user:")) {51 Long userId = Long.parseLong(key.substring(5));52 return userRepository.findById(userId).orElse(null);53 }54 return null;55 }56}1@Component2public class CacheMetricsCollector {3 4 private final MeterRegistry registry;5 private final Map<String, Counter> hitCounters = new HashMap<>();6 private final Map<String, Counter> missCounters = new HashMap<>();7 private final Map<String, Timer> accessTimers = new HashMap<>();8 9 public CacheMetricsCollector(MeterRegistry registry) {10 this.registry = registry;11 12 // 初始化计数器13 initializeCounters("l1");14 initializeCounters("l2");15 initializeCounters("l3");16 }17 18 private void initializeCounters(String level) {19 hitCounters.put(level, Counter.builder("cache." + level + ".hits")20 .description(level + " cache hit count")21 .register(registry));22 23 missCounters.put(level, Counter.builder("cache." + level + ".misses")24 .description(level + " cache miss count")25 .register(registry));26 27 accessTimers.put(level, Timer.builder("cache." + level + ".access.time")28 .description(level + " cache access time")29 .register(registry));30 }31 32 public void recordHit(String level) {33 hitCounters.get(level).increment();34 }35 36 public void recordMiss(String level) {37 missCounters.get(level).increment();38 }39 40 public <T> T recordAccessTime(String level, Supplier<T> operation) {41 return accessTimers.get(level).record(operation);42 }43 44 // 获取命中率45 public double getHitRate(String level) {46 long hits = (long) hitCounters.get(level).count();47 long misses = (long) missCounters.get(level).count();48 long total = hits + misses;49 50 return total == 0 ? 0 : (double) hits / total;51 }52}3.2 缓存更新策略
缓存更新策略关系到系统数据一致性和性能,不同的场景需要选择不同的策略。
- 写穿策略
- 写回策略
- 失效策略
1@Transactional2 public void writeThrough(String key, Object value) {3 // 1. 更新数据库4 updateDatabase(key, value);5 6 // 2. 更新缓存7 redisTemplate.opsForValue().set(key, value, Duration.ofMinutes(30));8 localCache.invalidate(key);9 }1public void writeBack(String key, Object value) {2 // 1. 更新缓存3 redisTemplate.opsForValue().set(key, value, Duration.ofMinutes(30));4 localCache.invalidate(key);5 6 // 2. 异步更新数据库7 CompletableFuture.runAsync(() -> {8 updateDatabase(key, value);9 });10 }1@Transactional2 public void writeInvalidate(String key, Object value) {3 // 1. 更新数据库4 updateDatabase(key, value);5 6 // 2. 删除缓存7 redisTemplate.delete(key);8 localCache.invalidate(key);9 }- 一致性与性能平衡:强一致性会影响性能,需要在一致性和性能之间找平衡
- 数据特性考虑:高频读取的数据适合Read-Through,频繁写入的数据可能适合Write-Invalidate
- 失效风险:先删除缓存再更新数据库可能导致脏读
- 双写风险:先更新数据库再更新缓存可能因为并发导致不一致
- 延迟双删:更新数据库后删除缓存,延迟一段时间后再次删除缓存
3.3 缓存穿透防护
缓存穿透是指查询一个一定不存在的数据,导致每次请求都会穿透缓存到达数据库。
缓存穿透、击穿、雪崩的区别
缓存穿透(Cache Penetration):
- 查询不存在的数据,缓存未命中直接查询数据库
- 可以通过空值缓存、布隆过滤器等方案解决
缓存击穿(Cache Breakdown):
- 热点数据过期瞬间,大量请求直接查询数据库
- 可以通过互斥锁、热点数据永不过期等方案解决
缓存雪崩(Cache Avalanche):
- 大量缓存同时过期,导致请求全部转发到数据库
- 可以通过随机过期时间、多级缓存等方案解决
- 布隆过滤器
- 互斥锁防击穿
1@Component2public class BloomFilter {3 4 private BitSet bitSet;5 private int size;6 private int hashFunctions;7 8 public BloomFilter(int size, int hashFunctions) {9 this.bitSet = new BitSet(size);10 this.size = size;11 this.hashFunctions = hashFunctions;12 }13 14 public void add(String key) {15 for (int i = 0; i < hashFunctions; i++) {16 int hash = hash(key, i);17 bitSet.set(hash % size);18 }19 }20 21 public boolean mightContain(String key) {22 for (int i = 0; i < hashFunctions; i++) {23 int hash = hash(key, i);24 if (!bitSet.get(hash % size)) {25 return false;26 }27 }28 return true;29 }30 31 private int hash(String key, int seed) {32 int result = 0;33 for (int i = 0; i < key.length(); i++) {34 result = 31 * result + key.charAt(i) + seed;35 }36 return Math.abs(result);37 }38}3940// 使用布隆过滤器防止缓存穿透41@Service42public class CachePenetrationProtection {43 44 @Autowired45 private RedisTemplate<String, Object> redisTemplate;46 47 @Autowired48 private BloomFilter bloomFilter;49 50 public Object getDataWithProtection(String key) {51 // 1. 布隆过滤器检查52 if (!bloomFilter.mightContain(key)) {53 return null;54 }55 56 // 2. 从缓存获取57 Object value = redisTemplate.opsForValue().get(key);58 if (value != null) {59 return value;60 }61 62 // 3. 缓存未命中,从数据库获取63 value = getFromDatabase(key);64 if (value != null) {65 // 写入缓存66 redisTemplate.opsForValue().set(key, value, Duration.ofMinutes(30));67 } else {68 // 空值缓存,防止缓存穿透69 redisTemplate.opsForValue().set(key, "NULL", Duration.ofMinutes(5));70 }71 72 return value;73 }74}1@Service2public class CacheBreakdownProtection {3 4 @Autowired5 private RedisTemplate<String, Object> redisTemplate;6 7 @Autowired8 private StringRedisTemplate stringRedisTemplate;9 10 public Object getDataWithLock(String key) {11 // 1. 从缓存获取12 Object value = redisTemplate.opsForValue().get(key);13 if (value != null) {14 return value;15 }16 17 // 2. 获取互斥锁18 String lockKey = "lock:" + key;19 boolean locked = false;20 try {21 locked = stringRedisTemplate.opsForValue()22 .setIfAbsent(lockKey, "1", Duration.ofSeconds(10));23 24 if (locked) {25 // 3. 双重检查26 value = redisTemplate.opsForValue().get(key);27 if (value != null) {28 return value;29 }30 31 // 4. 从数据库加载32 value = getFromDatabase(key);33 34 // 5. 更新缓存35 if (value != null) {36 redisTemplate.opsForValue().set(key, value, Duration.ofMinutes(30));37 } else {38 redisTemplate.opsForValue().set(key, "NULL", Duration.ofMinutes(5));39 }40 } else {41 // 6. 等待一段时间后重试42 Thread.sleep(50);43 return getDataWithLock(key);44 }45 46 return value;47 } catch (Exception e) {48 log.error("获取数据失败", e);49 return null;50 } finally {51 // 7. 释放锁52 if (locked) {53 stringRedisTemplate.delete(lockKey);54 }55 }56 }57}4. 异步处理机制
异步处理是高并发系统提升吞吐量的重要手段,通过将耗时操作异步化,可以快速响应用户请求。
4.1 消息队列
消息队列可以解耦系统组件,平滑流量峰值,提高系统可靠性。
- 消息生产者
- 消息消费者
1@Service2public class MessageQueueService {3 4 @Autowired5 private RabbitTemplate rabbitTemplate;6 7 @Autowired8 private KafkaTemplate<String, String> kafkaTemplate;9 10 // 发送消息到RabbitMQ11 public void sendToRabbitMQ(String message) {12 rabbitTemplate.convertAndSend("exchange", "routing.key", message);13 }14 15 // 发送消息到Kafka16 public void sendToKafka(String topic, String message) {17 kafkaTemplate.send(topic, message);18 }19 20 // 异步处理订单21 public Long processOrderAsync(Order order) {22 // 1. 创建订单23 Order savedOrder = orderRepository.save(order);24 25 // 2. 发送异步消息26 OrderEvent event = new OrderEvent(savedOrder.getId(), "CREATED");27 rabbitTemplate.convertAndSend("order.exchange", "order.created", event);28 29 // 3. 立即返回订单ID30 return savedOrder.getId();31 }32}1@Component2public class OrderMessageConsumer {3 4 @RabbitListener(queues = "order.queue")5 public void handleOrderCreated(OrderEvent event) {6 // 异步处理订单创建后的业务逻辑7 switch (event.getType()) {8 case "CREATED":9 processOrderCreated(event.getOrderId());10 break;11 case "PAID":12 processOrderPaid(event.getOrderId());13 break;14 case "SHIPPED":15 processOrderShipped(event.getOrderId());16 break;17 }18 }19 20 private void processOrderCreated(Long orderId) {21 // 处理订单创建逻辑22 // 1. 发送确认邮件23 // 2. 更新库存24 // 3. 生成物流单25 }26}4.2 异步调用
异步调用可以避免请求线程被长时间阻塞,提高系统的吞吐量。
- 异步调用实现
- 超时控制
- 降级处理
1@Service2public class AsyncCallService {3 4 @Autowired5 private ThreadPoolTaskExecutor executor;6 7 // 异步调用外部服务8 public CompletableFuture<ApiResponse> callExternalServiceAsync(String url) {9 return CompletableFuture.supplyAsync(() -> {10 try {11 // 调用外部API12 RestTemplate restTemplate = new RestTemplate();13 return restTemplate.getForObject(url, ApiResponse.class);14 } catch (Exception e) {15 log.error("外部服务调用失败", e);16 return new ApiResponse("ERROR", e.getMessage());17 }18 }, executor);19 }20 21 // 批量异步处理22 public List<ApiResponse> batchProcessAsync(List<String> urls) {23 List<CompletableFuture<ApiResponse>> futures = urls.stream()24 .map(this::callExternalServiceAsync)25 .collect(Collectors.toList());26 27 // 等待所有异步调用完成28 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();29 30 // 收集结果31 return futures.stream()32 .map(CompletableFuture::join)33 .collect(Collectors.toList());34 }35}1public ApiResponse callWithTimeout(String url, long timeout) {2 try {3 return callExternalServiceAsync(url)4 .orTimeout(timeout, TimeUnit.MILLISECONDS)5 .exceptionally(ex -> {6 if (ex instanceof TimeoutException) {7 log.error("调用超时", ex);8 return new ApiResponse("TIMEOUT", "请求超时");9 } else {10 log.error("调用失败", ex);11 return new ApiResponse("ERROR", ex.getMessage());12 }13 })14 .get();15 } catch (Exception e) {16 log.error("获取异步结果失败", e);17 return new ApiResponse("ERROR", e.getMessage());18 }19 }1public CompletableFuture<ApiResponse> callWithFallback(String url) {2 return callExternalServiceAsync(url)3 .thenApply(response -> {4 // 处理成功响应5 if ("SUCCESS".equals(response.getStatus())) {6 return response;7 }8 // 对错误响应进行转换9 return enhanceErrorResponse(response);10 })11 .exceptionally(ex -> {12 log.error("调用失败,使用降级响应", ex);13 return getFallbackResponse(url);14 });15}1617private ApiResponse getFallbackResponse(String url) {18 // 提供降级响应19 return new ApiResponse("FALLBACK", "使用降级响应");20}5. 数据库优化
数据库往往是高并发系统的性能瓶颈,优化数据库访问对提升系统整体性能至关重要。
5.1 读写分离
读写分离的核心优势:
- 负载分担:写操作集中在主库,读操作分散到多个从库
- 水平扩展:可以通过增加从库提升读性能
- 高可用性:主库故障时可以提升从库为主库
- 读写并行:读写操作可以并行执行,提高吞吐量
- 读写分离配置
- 数据源路由
- AOP切面
1@Configuration2public class DataSourceConfig {3 4 @Bean5 @Primary6 @ConfigurationProperties("spring.datasource.master")7 public DataSource masterDataSource() {8 return DataSourceBuilder.create().build();9 }10 11 @Bean12 @ConfigurationProperties("spring.datasource.slave")13 public DataSource slaveDataSource() {14 return DataSourceBuilder.create().build();15 }16 17 @Bean18 public DataSource routingDataSource() {19 RoutingDataSource routingDataSource = new RoutingDataSource();20 Map<Object, Object> targetDataSources = new HashMap<>();21 targetDataSources.put("master", masterDataSource());22 targetDataSources.put("slave", slaveDataSource());23 routingDataSource.setTargetDataSources(targetDataSources);24 routingDataSource.setDefaultTargetDataSource(masterDataSource());25 return routingDataSource;26 }27}1// 数据源路由2public class RoutingDataSource extends AbstractRoutingDataSource {3 4 @Override5 protected Object determineCurrentLookupKey() {6 return DataSourceContextHolder.getDataSourceType();7 }8}910// 数据源上下文11public class DataSourceContextHolder {12 13 private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();14 15 public static void setDataSourceType(String dataSourceType) {16 contextHolder.set(dataSourceType);17 }18 19 public static String getDataSourceType() {20 return contextHolder.get();21 }22 23 public static void clearDataSourceType() {24 contextHolder.remove();25 }26}1// AOP切面2@Aspect3@Component4public class DataSourceAspect {5 6 @Around("@annotation(readOnly)")7 public Object around(ProceedingJoinPoint point, ReadOnly readOnly) throws Throwable {8 DataSourceContextHolder.setDataSourceType("slave");9 try {10 return point.proceed();11 } finally {12 DataSourceContextHolder.clearDataSourceType();13 }14 }15}1617// 只读注解18@Target(ElementType.METHOD)19@Retention(RetentionPolicy.RUNTIME)20public @interface ReadOnly {21}5.2 分库分表
分库分表是解决数据库单表数据量过大问题的有效方案,通过水平或垂直拆分,可以提升数据库的并发处理能力。
- 分片策略
- 分片服务
1@Component2public class ShardingStrategy {3 4 private List<DataSource> dataSources = Arrays.asList(5 dataSource1, dataSource2, dataSource3, dataSource46 );7 8 // 根据用户ID分库9 public DataSource getDataSourceByUserId(Long userId) {10 int dbIndex = (int) (userId % dataSources.size());11 return dataSources.get(dbIndex);12 }13 14 // 根据用户ID分表15 public String getTableNameByUserId(String baseTableName, Long userId) {16 int tableIndex = (int) (userId % 10);17 return baseTableName + "_" + tableIndex;18 }19 20 // 一致性哈希分片21 public DataSource getDataSourceByHash(String key) {22 int hash = Math.abs(key.hashCode());23 int index = hash % dataSources.size();24 return dataSources.get(index);25 }26}1@Service2public class ShardingService {3 4 @Autowired5 private ShardingStrategy shardingStrategy;6 7 public User getUserById(Long userId) {8 DataSource dataSource = shardingStrategy.getDataSourceByUserId(userId);9 String tableName = shardingStrategy.getTableNameByUserId("users", userId);10 11 // 使用指定的数据源和表名查询12 return queryUserFromDataSource(dataSource, tableName, userId);13 }14 15 private User queryUserFromDataSource(DataSource dataSource, String tableName, Long userId) {16 try (Connection conn = dataSource.getConnection()) {17 PreparedStatement stmt = conn.prepareStatement(18 "SELECT * FROM " + tableName + " WHERE id = ?"19 );20 stmt.setLong(1, userId);21 ResultSet rs = stmt.executeQuery();22 23 if (rs.next()) {24 User user = new User();25 user.setId(rs.getLong("id"));26 user.setName(rs.getString("name"));27 user.setEmail(rs.getString("email"));28 return user;29 }30 31 return null;32 } catch (SQLException e) {33 log.error("查询用户失败", e);34 throw new RuntimeException("查询用户失败", e);35 }36 }37}- 分片键选择:选择合适的分片键对性能至关重要
- 跨库查询:跨库查询会带来性能问题,应尽量避免
- 事务处理:跨库事务需要使用分布式事务处理
- 数据扩容:随着数据增长,可能需要重新分片
- 数据迁移:历史数据迁移需要在业务低峰期进行
6. 面试题精选
Q: 什么是高并发?如何设计高并发系统?
A: 高并发是指系统能够同时处理大量并发请求的能力。设计高并发系统的方法:
- 架构层面:采用分布式架构、微服务架构
- 缓存层面:多级缓存、缓存预热、缓存更新策略
- 异步处理:消息队列、异步调用、事件驱动
- 数据库优化:读写分离、分库分表、索引优化
- 负载均衡:应用层负载均衡、数据库负载均衡
Q: 如何解决缓存穿透问题?
A: 解决缓存穿透的方法:
- 布隆过滤器:快速判断key是否存在
- 空值缓存:将空结果也缓存,设置较短的过期时间
- 参数校验:对请求参数进行严格校验
- 限流保护:对异常请求进行限流
Q: 如何设计一个秒杀系统?
A: 秒杀系统设计要点:
- 前端优化:页面静态化、CDN加速、按钮控制
- 接口优化:限流、异步处理、缓存预热
- 库存控制:预扣库存、库存锁定、超时释放
- 订单处理:异步创建订单、消息队列、数据库优化
- 防刷机制:用户限购、IP限流、验证码
Q: 如何处理高并发下的数据一致性问题?
A: 数据一致性处理方法:
- 分布式事务:2PC、3PC、TCC、Saga
- 最终一致性:消息队列、补偿机制、幂等性
- 乐观锁:版本号、时间戳、CAS操作
- 悲观锁:数据库锁、分布式锁、Redis锁
- 理解并发模型:掌握线程模型、异步编程、并发控制
- 掌握缓存策略:学会多级缓存、缓存更新、缓存穿透防护
- 熟悉异步处理:了解消息队列、异步调用、事件驱动
- 学会数据库优化:掌握读写分离、分库分表、索引优化
- 了解负载均衡:学会负载均衡算法、健康检查、故障转移
通过本章的学习,你应该已经掌握了高并发系统设计的核心概念、架构模式和优化策略。高并发系统设计是构建高性能、高可用系统的重要技能,掌握这些技术可以帮助你设计出能够处理大量并发请求的系统。在实际项目中,合理运用这些技术可以大大提高系统的并发处理能力和用户体验。
参与讨论