分布式事务解决方案详解
分布式事务是分布式系统中的核心挑战,用于在多个服务/数据库之间保持数据一致性。本章将详细介绍分布式事务的理论基础、技术方案和最佳实践。
分布式事务的重要性
- 数据一致性:确保跨服务的数据操作保持一致性
- 业务完整性:保证复杂业务流程的原子性
- 系统可靠性:提高分布式系统的容错能力
- 用户体验:避免数据不一致导致的用户困惑
1. 分布式事务基础理论
1.1 分布式事务的挑战
分布式事务面临的主要挑战包括:
| 挑战 | 描述 | 影响 |
|---|---|---|
| 网络分区 | 网络延迟、丢包、分区 | 消息传递不可靠 |
| 节点故障 | 服务宕机、数据库故障 | 事务状态不一致 |
| 时钟不同步 | 各节点时钟存在偏差 | 时序判断错误 |
| 并发控制 | 多事务并发执行 | 数据竞争和死锁 |
1.2 一致性模型
2. 两阶段提交协议(2PC)
2.1 2PC协议原理
两阶段提交协议是分布式事务的基础协议,通过两个阶段来保证事务的原子性。
协议流程
2.2 2PC实现示例
2PC协调者实现
java
1@Service2public class TwoPhaseCommitCoordinator {3 4 private final List<TransactionParticipant> participants;5 private final TransactionLog transactionLog;6 7 public TwoPhaseCommitCoordinator(List<TransactionParticipant> participants) {8 this.participants = participants;9 this.transactionLog = new TransactionLog();10 }11 12 /**13 * 执行两阶段提交14 */15 public boolean executeTransaction(TransactionContext context) {16 String transactionId = generateTransactionId();17 18 try {19 // 记录事务开始20 transactionLog.logTransactionStart(transactionId, context);21 22 // 第一阶段:准备阶段23 if (!preparePhase(transactionId, context)) {24 // 准备失败,执行回滚25 abortPhase(transactionId, context);26 return false;27 }28 29 // 第二阶段:提交阶段30 if (!commitPhase(transactionId, context)) {31 // 提交失败,需要人工干预32 log.error("Transaction commit failed: " + transactionId);33 return false;34 }35 36 // 记录事务完成37 transactionLog.logTransactionComplete(transactionId);38 return true;39 40 } catch (Exception e) {41 log.error("Transaction execution failed: " + transactionId, e);42 // 发生异常,执行回滚43 abortPhase(transactionId, context);44 return false;45 }46 }47 48 /**49 * 准备阶段:询问所有参与者是否可以提交50 */51 private boolean preparePhase(String transactionId, TransactionContext context) {52 List<CompletableFuture<Boolean>> futures = new ArrayList<>();53 54 // 并行询问所有参与者55 for (TransactionParticipant participant : participants) {56 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {57 try {58 return participant.prepare(transactionId, context);59 } catch (Exception e) {60 log.error("Participant prepare failed: " + participant.getId(), e);61 return false;62 }63 });64 futures.add(future);65 }66 67 // 等待所有参与者响应68 try {69 CompletableFuture<Void> allFutures = CompletableFuture.allOf(70 futures.toArray(new CompletableFuture[0])71 );72 allFutures.get(30, TimeUnit.SECONDS); // 30秒超时73 74 // 检查所有参与者是否都同意75 for (CompletableFuture<Boolean> future : futures) {76 if (!future.get()) {77 return false; // 有参与者不同意78 }79 }80 81 return true; // 所有参与者都同意82 83 } catch (Exception e) {84 log.error("Prepare phase failed: " + transactionId, e);85 return false;86 }87 }88 89 /**90 * 提交阶段:通知所有参与者提交事务91 */92 private boolean commitPhase(String transactionId, TransactionContext context) {93 List<CompletableFuture<Boolean>> futures = new ArrayList<>();94 95 // 并行通知所有参与者提交96 for (TransactionParticipant participant : participants) {97 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {98 try {99 return participant.commit(transactionId, context);100 } catch (Exception e) {101 log.error("Participant commit failed: " + participant.getId(), e);102 return false;103 }104 });105 futures.add(future);106 }107 108 // 等待所有参与者确认109 try {110 CompletableFuture<Void> allFutures = CompletableFuture.allOf(111 futures.toArray(new CompletableFuture[0])112 );113 allFutures.get(30, TimeUnit.SECONDS); // 30秒超时114 115 // 检查所有参与者是否都提交成功116 for (CompletableFuture<Boolean> future : futures) {117 if (!future.get()) {118 return false; // 有参与者提交失败119 }120 }121 122 return true; // 所有参与者都提交成功123 124 } catch (Exception e) {125 log.error("Commit phase failed: " + transactionId, e);126 return false;127 }128 }129 130 /**131 * 回滚阶段:通知所有参与者回滚事务132 */133 private void abortPhase(String transactionId, TransactionContext context) {134 // 并行通知所有参与者回滚135 for (TransactionParticipant participant : participants) {136 CompletableFuture.runAsync(() -> {137 try {138 participant.abort(transactionId, context);139 } catch (Exception e) {140 log.error("Participant abort failed: " + participant.getId(), e);141 }142 });143 }144 145 // 记录事务回滚146 transactionLog.logTransactionAbort(transactionId);147 }148}2.3 参与者实现
2PC参与者实现
java
1@Service2public class TransactionParticipant {3 4 private final String participantId;5 private final DataSource dataSource;6 private final TransactionLog transactionLog;7 8 public TransactionParticipant(String participantId, DataSource dataSource) {9 this.participantId = participantId;10 this.dataSource = dataSource;11 this.transactionLog = new TransactionLog();12 }13 14 /**15 * 准备阶段:检查是否可以提交16 */17 public boolean prepare(String transactionId, TransactionContext context) {18 Connection connection = null;19 try {20 connection = dataSource.getConnection();21 connection.setAutoCommit(false);22 23 // 记录准备状态24 transactionLog.logPrepare(transactionId, participantId, context);25 26 // 执行预提交操作27 boolean canCommit = executePreCommit(connection, context);28 29 if (canCommit) {30 // 记录准备成功31 transactionLog.logPrepareSuccess(transactionId, participantId);32 return true;33 } else {34 // 记录准备失败35 transactionLog.logPrepareFailure(transactionId, participantId);36 return false;37 }38 39 } catch (Exception e) {40 log.error("Prepare failed: " + transactionId, e);41 transactionLog.logPrepareFailure(transactionId, participantId, e);42 return false;43 } finally {44 if (connection != null) {45 try {46 connection.close();47 } catch (SQLException e) {48 log.error("Failed to close connection", e);49 }50 }51 }52 }53 54 /**55 * 提交阶段:执行实际提交56 */57 public boolean commit(String transactionId, TransactionContext context) {58 Connection connection = null;59 try {60 connection = dataSource.getConnection();61 connection.setAutoCommit(false);62 63 // 检查是否已经准备64 if (!transactionLog.isPrepared(transactionId, participantId)) {65 log.error("Transaction not prepared: " + transactionId);66 return false;67 }68 69 // 执行提交操作70 boolean success = executeCommit(connection, context);71 72 if (success) {73 connection.commit();74 transactionLog.logCommitSuccess(transactionId, participantId);75 return true;76 } else {77 connection.rollback();78 transactionLog.logCommitFailure(transactionId, participantId);79 return false;80 }81 82 } catch (Exception e) {83 log.error("Commit failed: " + transactionId, e);84 if (connection != null) {85 try {86 connection.rollback();87 } catch (SQLException se) {88 log.error("Failed to rollback", se);89 }90 }91 transactionLog.logCommitFailure(transactionId, participantId, e);92 return false;93 } finally {94 if (connection != null) {95 try {96 connection.close();97 } catch (SQLException e) {98 log.error("Failed to close connection", e);99 }100 }101 }102 }103 104 /**105 * 回滚阶段:执行回滚操作106 */107 public boolean abort(String transactionId, TransactionContext context) {108 Connection connection = null;109 try {110 connection = dataSource.getConnection();111 connection.setAutoCommit(false);112 113 // 执行回滚操作114 boolean success = executeAbort(connection, context);115 116 if (success) {117 connection.commit();118 transactionLog.logAbortSuccess(transactionId, participantId);119 return true;120 } else {121 connection.rollback();122 transactionLog.logAbortFailure(transactionId, participantId);123 return false;124 }125 126 } catch (Exception e) {127 log.error("Abort failed: " + transactionId, e);128 if (connection != null) {129 try {130 connection.rollback();131 } catch (SQLException se) {132 log.error("Failed to rollback", se);133 }134 }135 transactionLog.logAbortFailure(transactionId, participantId, e);136 return false;137 } finally {138 if (connection != null) {139 try {140 connection.close();141 } catch (SQLException e) {142 log.error("Failed to close connection", e);143 }144 }145 }146 }147 148 /**149 * 执行预提交操作150 */151 private boolean executePreCommit(Connection connection, TransactionContext context) {152 // 具体的预提交逻辑153 // 例如:检查资源可用性、预留资源等154 return true;155 }156 157 /**158 * 执行提交操作159 */160 private boolean executeCommit(Connection connection, TransactionContext context) {161 // 具体的提交逻辑162 // 例如:更新数据库、发送消息等163 return true;164 }165 166 /**167 * 执行回滚操作168 */169 private boolean executeAbort(Connection connection, TransactionContext context) {170 // 具体的回滚逻辑171 // 例如:释放资源、撤销操作等172 return true;173 }174}2.4 2PC的优缺点
优点
- 强一致性:保证事务的ACID特性
- 实现简单:协议相对简单,易于理解
- 理论成熟:有完善的理论基础
缺点
-
性能问题:
- 协调者单点瓶颈
- 准备阶段长期持锁,影响吞吐量
- 两阶段通信开销大
-
可用性问题:
- 协调者单点故障
- 参与者故障影响整个事务
- 网络分区导致阻塞
-
恢复复杂:
- 协调者崩溃恢复复杂
- 需要超时机制和重选策略
- 状态不一致处理困难
2PC适用场景
2PC适用于对一致性要求极高、事务规模较小的场景,如金融交易、库存管理等。
3. 三阶段提交协议(3PC)
3.1 3PC协议原理
三阶段提交协议是对2PC的改进,通过增加预提交阶段来减少阻塞时间,提高系统性能。
协议流程
3.2 3PC的优势
相比2PC的改进
- 减少阻塞时间:预提交阶段减少了资源锁定时间
- 提高性能:并行处理能力更强
- 更好的容错性:超时机制更完善
仍然存在的问题
- 实现复杂:增加了预提交阶段的复杂性
- 网络开销:多了一个阶段的通信开销
- 工程落地困难:实际应用中较少采用
3PC现状
现代分布式系统较少直接使用3PC,更多采用基于补偿的最终一致性方案,如TCC、Saga等。
4. TCC模式(Try-Confirm-Cancel)
4.1 TCC模式原理
TCC(Try-Confirm-Cancel)是一种基于补偿的分布式事务模式,将事务拆分为三个阶段:
- Try阶段:资源预留,检查并预留资源
- Confirm阶段:确认执行,使用预留的资源
- Cancel阶段:取消执行,释放预留的资源
4.2 TCC接口设计
TCC接口定义
java
1public interface TccService {2 3 /**4 * Try阶段:资源预留5 * @param context 事务上下文6 * @return 是否预留成功7 */8 boolean try(TransactionContext context);9 10 /**11 * Confirm阶段:确认执行12 * @param context 事务上下文13 * @return 是否确认成功14 */15 boolean confirm(TransactionContext context);16 17 /**18 * Cancel阶段:取消执行19 * @param context 事务上下文20 * @return 是否取消成功21 */22 boolean cancel(TransactionContext context);23}4.3 账户服务TCC实现
账户服务TCC实现
java
1@Service2public class AccountTccService implements TccService {3 4 private final AccountRepository accountRepository;5 private final TransactionLog transactionLog;6 7 @Override8 public boolean try(TransactionContext context) {9 String transactionId = context.getTransactionId();10 String accountId = context.getAccountId();11 BigDecimal amount = context.getAmount();12 13 try {14 // 1. 检查账户是否存在15 Account account = accountRepository.findById(accountId);16 if (account == null) {17 log.error("Account not found: " + accountId);18 return false;19 }20 21 // 2. 检查余额是否充足22 if (account.getBalance().compareTo(amount) < 0) {23 log.error("Insufficient balance: " + accountId);24 return false;25 }26 27 // 3. 冻结资金(预留资源)28 account.setFrozenAmount(account.getFrozenAmount().add(amount));29 account.setBalance(account.getBalance().subtract(amount));30 accountRepository.save(account);31 32 // 4. 记录Try操作33 transactionLog.logTry(transactionId, accountId, amount);34 35 return true;36 37 } catch (Exception e) {38 log.error("Try failed for account: " + accountId, e);39 return false;40 }41 }42 43 @Override44 public boolean confirm(TransactionContext context) {45 String transactionId = context.getTransactionId();46 String accountId = context.getAccountId();47 BigDecimal amount = context.getAmount();48 49 try {50 // 1. 检查是否已经Try过51 if (!transactionLog.hasTried(transactionId, accountId)) {52 log.error("Transaction not tried: " + transactionId);53 return false;54 }55 56 // 2. 检查是否已经Confirm过(幂等性)57 if (transactionLog.hasConfirmed(transactionId, accountId)) {58 log.info("Transaction already confirmed: " + transactionId);59 return true;60 }61 62 // 3. 确认扣款(使用冻结的资金)63 Account account = accountRepository.findById(accountId);64 if (account != null) {65 // 解冻资金,实际扣款66 account.setFrozenAmount(account.getFrozenAmount().subtract(amount));67 accountRepository.save(account);68 }69 70 // 4. 记录Confirm操作71 transactionLog.logConfirm(transactionId, accountId, amount);72 73 return true;74 75 } catch (Exception e) {76 log.error("Confirm failed for account: " + accountId, e);77 return false;78 }79 }80 81 @Override82 public boolean cancel(TransactionContext context) {83 String transactionId = context.getTransactionId();84 String accountId = context.getAccountId();85 BigDecimal amount = context.getAmount();86 87 try {88 // 1. 检查是否已经Try过89 if (!transactionLog.hasTried(transactionId, accountId)) {90 log.error("Transaction not tried: " + transactionId);91 return false;92 }93 94 // 2. 检查是否已经Cancel过(幂等性)95 if (transactionLog.hasCancelled(transactionId, accountId)) {96 log.info("Transaction already cancelled: " + transactionId);97 return true;98 }99 100 // 3. 解冻资金(释放预留的资源)101 Account account = accountRepository.findById(accountId);102 if (account != null) {103 account.setFrozenAmount(account.getFrozenAmount().subtract(amount));104 account.setBalance(account.getBalance().add(amount));105 accountRepository.save(account);106 }107 108 // 4. 记录Cancel操作109 transactionLog.logCancel(transactionId, accountId, amount);110 111 return true;112 113 } catch (Exception e) {114 log.error("Cancel failed for account: " + accountId, e);115 return false;116 }117 }118}4.4 库存服务TCC实现
库存服务TCC实现
java
1@Service2public class InventoryTccService implements TccService {3 4 private final InventoryRepository inventoryRepository;5 private final TransactionLog transactionLog;6 7 @Override8 public boolean try(TransactionContext context) {9 String transactionId = context.getTransactionId();10 String productId = context.getProductId();11 int quantity = context.getQuantity();12 13 try {14 // 1. 检查商品是否存在15 Inventory inventory = inventoryRepository.findByProductId(productId);16 if (inventory == null) {17 log.error("Product not found: " + productId);18 return false;19 }20 21 // 2. 检查库存是否充足22 if (inventory.getAvailableStock() < quantity) {23 log.error("Insufficient stock: " + productId);24 return false;25 }26 27 // 3. 预留库存28 inventory.setAvailableStock(inventory.getAvailableStock() - quantity);29 inventory.setReservedStock(inventory.getReservedStock() + quantity);30 inventoryRepository.save(inventory);31 32 // 4. 记录Try操作33 transactionLog.logTry(transactionId, productId, quantity);34 35 return true;36 37 } catch (Exception e) {38 log.error("Try failed for product: " + productId, e);39 return false;40 }41 }42 43 @Override44 public boolean confirm(TransactionContext context) {45 String transactionId = context.getTransactionId();46 String productId = context.getProductId();47 int quantity = context.getQuantity();48 49 try {50 // 1. 检查是否已经Try过51 if (!transactionLog.hasTried(transactionId, productId)) {52 log.error("Transaction not tried: " + transactionId);53 return false;54 }55 56 // 2. 检查是否已经Confirm过(幂等性)57 if (transactionLog.hasConfirmed(transactionId, productId)) {58 log.info("Transaction already confirmed: " + transactionId);59 return true;60 }61 62 // 3. 确认扣减库存63 Inventory inventory = inventoryRepository.findByProductId(productId);64 if (inventory != null) {65 // 解冻库存,实际扣减66 inventory.setReservedStock(inventory.getReservedStock() - quantity);67 inventoryRepository.save(inventory);68 }69 70 // 4. 记录Confirm操作71 transactionLog.logConfirm(transactionId, productId, quantity);72 73 return true;74 75 } catch (Exception e) {76 log.error("Confirm failed for product: " + productId, e);77 return false;78 }79 }80 81 @Override82 public boolean cancel(TransactionContext context) {83 String transactionId = context.getTransactionId();84 String productId = context.getProductId();85 int quantity = context.getQuantity();86 87 try {88 // 1. 检查是否已经Try过89 if (!transactionLog.hasTried(transactionId, productId)) {90 log.error("Transaction not tried: " + transactionId);91 return false;92 }93 94 // 2. 检查是否已经Cancel过(幂等性)95 if (transactionLog.hasCancelled(transactionId, productId)) {96 log.info("Transaction already cancelled: " + transactionId);97 return true;98 }99 100 // 3. 释放预留库存101 Inventory inventory = inventoryRepository.findByProductId(productId);102 if (inventory != null) {103 inventory.setReservedStock(inventory.getReservedStock() - quantity);104 inventory.setAvailableStock(inventory.getAvailableStock() + quantity);105 inventoryRepository.save(inventory);106 }107 108 // 4. 记录Cancel操作109 transactionLog.logCancel(transactionId, productId, quantity);110 111 return true;112 113 } catch (Exception e) {114 log.error("Cancel failed for product: " + productId, e);115 return false;116 }117 }118}4.5 TCC事务协调器
TCC事务协调器
java
1@Service2public class TccTransactionCoordinator {3 4 private final List<TccService> tccServices;5 private final TransactionLog transactionLog;6 7 public TccTransactionCoordinator(List<TccService> tccServices) {8 this.tccServices = tccServices;9 this.transactionLog = new TransactionLog();10 }11 12 /**13 * 执行TCC事务14 */15 public boolean executeTransaction(TransactionContext context) {16 String transactionId = context.getTransactionId();17 18 try {19 // 记录事务开始20 transactionLog.logTransactionStart(transactionId, context);21 22 // Try阶段:所有服务预留资源23 if (!tryPhase(transactionId, context)) {24 // Try失败,执行Cancel25 cancelPhase(transactionId, context);26 return false;27 }28 29 // Confirm阶段:所有服务确认执行30 if (!confirmPhase(transactionId, context)) {31 // Confirm失败,需要人工干预32 log.error("Transaction confirm failed: " + transactionId);33 return false;34 }35 36 // 记录事务完成37 transactionLog.logTransactionComplete(transactionId);38 return true;39 40 } catch (Exception e) {41 log.error("Transaction execution failed: " + transactionId, e);42 // 发生异常,执行Cancel43 cancelPhase(transactionId, context);44 return false;45 }46 }47 48 /**49 * Try阶段:所有服务预留资源50 */51 private boolean tryPhase(String transactionId, TransactionContext context) {52 List<CompletableFuture<Boolean>> futures = new ArrayList<>();53 54 // 并行执行所有服务的Try操作55 for (TccService service : tccServices) {56 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {57 try {58 return service.try(context);59 } catch (Exception e) {60 log.error("Service try failed", e);61 return false;62 }63 });64 futures.add(future);65 }66 67 // 等待所有服务完成Try68 try {69 CompletableFuture<Void> allFutures = CompletableFuture.allOf(70 futures.toArray(new CompletableFuture[0])71 );72 allFutures.get(30, TimeUnit.SECONDS); // 30秒超时73 74 // 检查所有服务是否都Try成功75 for (CompletableFuture<Boolean> future : futures) {76 if (!future.get()) {77 return false; // 有服务Try失败78 }79 }80 81 return true; // 所有服务都Try成功82 83 } catch (Exception e) {84 log.error("Try phase failed: " + transactionId, e);85 return false;86 }87 }88 89 /**90 * Confirm阶段:所有服务确认执行91 */92 private boolean confirmPhase(String transactionId, TransactionContext context) {93 List<CompletableFuture<Boolean>> futures = new ArrayList<>();94 95 // 并行执行所有服务的Confirm操作96 for (TccService service : tccServices) {97 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {98 try {99 return service.confirm(context);100 } catch (Exception e) {101 log.error("Service confirm failed", e);102 return false;103 }104 });105 futures.add(future);106 }107 108 // 等待所有服务完成Confirm109 try {110 CompletableFuture<Void> allFutures = CompletableFuture.allOf(111 futures.toArray(new CompletableFuture[0])112 );113 allFutures.get(30, TimeUnit.SECONDS); // 30秒超时114 115 // 检查所有服务是否都Confirm成功116 for (CompletableFuture<Boolean> future : futures) {117 if (!future.get()) {118 return false; // 有服务Confirm失败119 }120 }121 122 return true; // 所有服务都Confirm成功123 124 } catch (Exception e) {125 log.error("Confirm phase failed: " + transactionId, e);126 return false;127 }128 }129 130 /**131 * Cancel阶段:所有服务取消执行132 */133 private void cancelPhase(String transactionId, TransactionContext context) {134 // 并行执行所有服务的Cancel操作135 for (TccService service : tccServices) {136 CompletableFuture.runAsync(() -> {137 try {138 service.cancel(context);139 } catch (Exception e) {140 log.error("Service cancel failed", e);141 }142 });143 }144 145 // 记录事务取消146 transactionLog.logTransactionCancel(transactionId);147 }148}4.6 TCC的关键问题
1. 幂等性问题
幂等性处理示例
java
1@Service2public class IdempotentTccService {3 4 private final TransactionLog transactionLog;5 6 public boolean confirm(TransactionContext context) {7 String transactionId = context.getTransactionId();8 9 // 检查是否已经Confirm过(幂等性检查)10 if (transactionLog.hasConfirmed(transactionId)) {11 log.info("Transaction already confirmed: " + transactionId);12 return true; // 直接返回成功13 }14 15 // 执行Confirm逻辑16 boolean success = doConfirm(context);17 18 if (success) {19 // 记录Confirm状态20 transactionLog.logConfirm(transactionId);21 }22 23 return success;24 }25}2. 空回滚问题
空回滚处理示例
java
1@Service2public class EmptyRollbackTccService {3 4 public boolean cancel(TransactionContext context) {5 String transactionId = context.getTransactionId();6 7 // 检查是否已经Try过8 if (!transactionLog.hasTried(transactionId)) {9 // 没有Try过,记录空回滚10 transactionLog.logEmptyRollback(transactionId);11 return true; // 空回滚也算成功12 }13 14 // 执行Cancel逻辑15 return doCancel(context);16 }17}3. 悬挂问题
悬挂问题处理示例
java
1@Service2public class HangingTccService {3 4 public boolean confirm(TransactionContext context) {5 String transactionId = context.getTransactionId();6 7 // 检查是否已经Cancel过8 if (transactionLog.hasCancelled(transactionId)) {9 log.warn("Transaction cancelled before confirm: " + transactionId);10 return false; // 拒绝Confirm11 }12 13 // 执行Confirm逻辑14 return doConfirm(context);15 }16}TCC模式
思想:将一次业务拆为三步 Try/Confirm/Cancel,Try预留资源,Confirm提交,Cancel回滚。
适用:
- 强业务语义,可实现精细补偿
- 每个资源均可提供Try/Confirm/Cancel接口
注意:幂等、防悬挂(Confirm前Try必须成功)、空回滚(Try未成功时的Cancel)
java
1// TCC接口示例2public interface AccountTccService {3 boolean tryFreeze(long userId, java.math.BigDecimal amount);4 boolean confirm(long userId, java.math.BigDecimal amount);5 boolean cancel(long userId, java.math.BigDecimal amount);6}78// 账户转账的应用服务(伪代码)9public class TransferService {10 private AccountTccService fromAccount;11 private AccountTccService toAccount;1213 public boolean transfer(long from, long to, java.math.BigDecimal amt) {14 // Try15 boolean t1 = fromAccount.tryFreeze(from, amt);16 boolean t2 = toAccount.tryFreeze(to, amt);17 if (!t1 || !t2) {18 // Cancel19 if (t1) { fromAccount.cancel(from, amt); }20 if (t2) { toAccount.cancel(to, amt); }21 return false;22 }23 // Confirm24 boolean c1 = fromAccount.confirm(from, amt);25 boolean c2 = toAccount.confirm(to, amt);26 if (!c1 || !c2) {27 // 出现异常时执行补偿策略(根据业务定义)28 return false;29 }30 return true;31 }32}Saga模式
将长事务拆分为一系列可独立提交的本地事务,每步配备补偿动作(逆操作)。
两种编排:
- Orchestration:由编排器驱动流程(状态机)
- Choreography:各服务通过事件驱动协作(无中心)
优势:
- 无全局锁,吞吐高,适合跨服务长链路 劣势:
- 仅最终一致,补偿设计复杂
yaml
1# Orchestration示意(伪配置)2states:3 - name: CreateOrder4 execute: order.create5 compensate: order.cancel6 - name: ReserveInventory7 execute: inventory.reserve8 compensate: inventory.release9 - name: Pay10 execute: payment.pay11 compensate: payment.refundOutbox(事务外盒)与可靠消息
目标:本地数据库写入与消息发送原子化。
做法:
- 业务数据与Outbox消息同一事务落库
- 后台投递器扫描Outbox表,发送到消息队列
- 消费方处理并幂等消费
优点:实现简单、对现有系统改造成本小 注意:去重、幂等键、重试与死信队列
sql
1-- Outbox表示例2CREATE TABLE outbox (3 id BIGINT PRIMARY KEY,4 aggregate_id BIGINT,5 topic VARCHAR(100),6 payload TEXT,7 status VARCHAR(20),8 created_at TIMESTAMP,9 updated_at TIMESTAMP10);幂等与防重
- 业务幂等键(如订单号、请求ID)
- 去重表/唯一索引、分布式锁
- 消费端“至少一次”+幂等处理
选型建议
- 强一致小范围:2PC/XA;或由单一聚合根串行化
- 跨服务长流程:Saga/TCC + Outbox
- 对吞吐敏感:优先补偿型而非全局锁定
面试题
- 2PC的两个阶段是什么?缺点有哪些
- TCC如何避免空回滚与悬挂
- Saga编排与编舞的差异,如何选择
- Outbox如何保证不丢消息与不重复消费
- 幂等的常用实现手段有哪些
评论