Skip to main content

分布式事务解决方案详解

分布式事务是分布式系统中的核心挑战,用于在多个服务/数据库之间保持数据一致性。本章将详细介绍分布式事务的理论基础、技术方案和最佳实践。

分布式事务的重要性
  • 数据一致性:确保跨服务的数据操作保持一致性
  • 业务完整性:保证复杂业务流程的原子性
  • 系统可靠性:提高分布式系统的容错能力
  • 用户体验:避免数据不一致导致的用户困惑

1. 分布式事务基础理论

1.1 分布式事务的挑战

分布式事务面临的主要挑战包括:

挑战描述影响
网络分区网络延迟、丢包、分区消息传递不可靠
节点故障服务宕机、数据库故障事务状态不一致
时钟不同步各节点时钟存在偏差时序判断错误
并发控制多事务并发执行数据竞争和死锁

1.2 一致性模型

2. 两阶段提交协议(2PC)

2.1 2PC协议原理

两阶段提交协议是分布式事务的基础协议,通过两个阶段来保证事务的原子性。

协议流程

2.2 2PC实现示例

2PC协调者实现
java
1@Service
2public class TwoPhaseCommitCoordinator {
3
4 private final List<TransactionParticipant> participants;
5 private final TransactionLog transactionLog;
6
7 public TwoPhaseCommitCoordinator(List<TransactionParticipant> participants) {
8 this.participants = participants;
9 this.transactionLog = new TransactionLog();
10 }
11
12 /**
13 * 执行两阶段提交
14 */
15 public boolean executeTransaction(TransactionContext context) {
16 String transactionId = generateTransactionId();
17
18 try {
19 // 记录事务开始
20 transactionLog.logTransactionStart(transactionId, context);
21
22 // 第一阶段:准备阶段
23 if (!preparePhase(transactionId, context)) {
24 // 准备失败,执行回滚
25 abortPhase(transactionId, context);
26 return false;
27 }
28
29 // 第二阶段:提交阶段
30 if (!commitPhase(transactionId, context)) {
31 // 提交失败,需要人工干预
32 log.error("Transaction commit failed: " + transactionId);
33 return false;
34 }
35
36 // 记录事务完成
37 transactionLog.logTransactionComplete(transactionId);
38 return true;
39
40 } catch (Exception e) {
41 log.error("Transaction execution failed: " + transactionId, e);
42 // 发生异常,执行回滚
43 abortPhase(transactionId, context);
44 return false;
45 }
46 }
47
48 /**
49 * 准备阶段:询问所有参与者是否可以提交
50 */
51 private boolean preparePhase(String transactionId, TransactionContext context) {
52 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
53
54 // 并行询问所有参与者
55 for (TransactionParticipant participant : participants) {
56 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
57 try {
58 return participant.prepare(transactionId, context);
59 } catch (Exception e) {
60 log.error("Participant prepare failed: " + participant.getId(), e);
61 return false;
62 }
63 });
64 futures.add(future);
65 }
66
67 // 等待所有参与者响应
68 try {
69 CompletableFuture<Void> allFutures = CompletableFuture.allOf(
70 futures.toArray(new CompletableFuture[0])
71 );
72 allFutures.get(30, TimeUnit.SECONDS); // 30秒超时
73
74 // 检查所有参与者是否都同意
75 for (CompletableFuture<Boolean> future : futures) {
76 if (!future.get()) {
77 return false; // 有参与者不同意
78 }
79 }
80
81 return true; // 所有参与者都同意
82
83 } catch (Exception e) {
84 log.error("Prepare phase failed: " + transactionId, e);
85 return false;
86 }
87 }
88
89 /**
90 * 提交阶段:通知所有参与者提交事务
91 */
92 private boolean commitPhase(String transactionId, TransactionContext context) {
93 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
94
95 // 并行通知所有参与者提交
96 for (TransactionParticipant participant : participants) {
97 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
98 try {
99 return participant.commit(transactionId, context);
100 } catch (Exception e) {
101 log.error("Participant commit failed: " + participant.getId(), e);
102 return false;
103 }
104 });
105 futures.add(future);
106 }
107
108 // 等待所有参与者确认
109 try {
110 CompletableFuture<Void> allFutures = CompletableFuture.allOf(
111 futures.toArray(new CompletableFuture[0])
112 );
113 allFutures.get(30, TimeUnit.SECONDS); // 30秒超时
114
115 // 检查所有参与者是否都提交成功
116 for (CompletableFuture<Boolean> future : futures) {
117 if (!future.get()) {
118 return false; // 有参与者提交失败
119 }
120 }
121
122 return true; // 所有参与者都提交成功
123
124 } catch (Exception e) {
125 log.error("Commit phase failed: " + transactionId, e);
126 return false;
127 }
128 }
129
130 /**
131 * 回滚阶段:通知所有参与者回滚事务
132 */
133 private void abortPhase(String transactionId, TransactionContext context) {
134 // 并行通知所有参与者回滚
135 for (TransactionParticipant participant : participants) {
136 CompletableFuture.runAsync(() -> {
137 try {
138 participant.abort(transactionId, context);
139 } catch (Exception e) {
140 log.error("Participant abort failed: " + participant.getId(), e);
141 }
142 });
143 }
144
145 // 记录事务回滚
146 transactionLog.logTransactionAbort(transactionId);
147 }
148}

2.3 参与者实现

2PC参与者实现
java
1@Service
2public class TransactionParticipant {
3
4 private final String participantId;
5 private final DataSource dataSource;
6 private final TransactionLog transactionLog;
7
8 public TransactionParticipant(String participantId, DataSource dataSource) {
9 this.participantId = participantId;
10 this.dataSource = dataSource;
11 this.transactionLog = new TransactionLog();
12 }
13
14 /**
15 * 准备阶段:检查是否可以提交
16 */
17 public boolean prepare(String transactionId, TransactionContext context) {
18 Connection connection = null;
19 try {
20 connection = dataSource.getConnection();
21 connection.setAutoCommit(false);
22
23 // 记录准备状态
24 transactionLog.logPrepare(transactionId, participantId, context);
25
26 // 执行预提交操作
27 boolean canCommit = executePreCommit(connection, context);
28
29 if (canCommit) {
30 // 记录准备成功
31 transactionLog.logPrepareSuccess(transactionId, participantId);
32 return true;
33 } else {
34 // 记录准备失败
35 transactionLog.logPrepareFailure(transactionId, participantId);
36 return false;
37 }
38
39 } catch (Exception e) {
40 log.error("Prepare failed: " + transactionId, e);
41 transactionLog.logPrepareFailure(transactionId, participantId, e);
42 return false;
43 } finally {
44 if (connection != null) {
45 try {
46 connection.close();
47 } catch (SQLException e) {
48 log.error("Failed to close connection", e);
49 }
50 }
51 }
52 }
53
54 /**
55 * 提交阶段:执行实际提交
56 */
57 public boolean commit(String transactionId, TransactionContext context) {
58 Connection connection = null;
59 try {
60 connection = dataSource.getConnection();
61 connection.setAutoCommit(false);
62
63 // 检查是否已经准备
64 if (!transactionLog.isPrepared(transactionId, participantId)) {
65 log.error("Transaction not prepared: " + transactionId);
66 return false;
67 }
68
69 // 执行提交操作
70 boolean success = executeCommit(connection, context);
71
72 if (success) {
73 connection.commit();
74 transactionLog.logCommitSuccess(transactionId, participantId);
75 return true;
76 } else {
77 connection.rollback();
78 transactionLog.logCommitFailure(transactionId, participantId);
79 return false;
80 }
81
82 } catch (Exception e) {
83 log.error("Commit failed: " + transactionId, e);
84 if (connection != null) {
85 try {
86 connection.rollback();
87 } catch (SQLException se) {
88 log.error("Failed to rollback", se);
89 }
90 }
91 transactionLog.logCommitFailure(transactionId, participantId, e);
92 return false;
93 } finally {
94 if (connection != null) {
95 try {
96 connection.close();
97 } catch (SQLException e) {
98 log.error("Failed to close connection", e);
99 }
100 }
101 }
102 }
103
104 /**
105 * 回滚阶段:执行回滚操作
106 */
107 public boolean abort(String transactionId, TransactionContext context) {
108 Connection connection = null;
109 try {
110 connection = dataSource.getConnection();
111 connection.setAutoCommit(false);
112
113 // 执行回滚操作
114 boolean success = executeAbort(connection, context);
115
116 if (success) {
117 connection.commit();
118 transactionLog.logAbortSuccess(transactionId, participantId);
119 return true;
120 } else {
121 connection.rollback();
122 transactionLog.logAbortFailure(transactionId, participantId);
123 return false;
124 }
125
126 } catch (Exception e) {
127 log.error("Abort failed: " + transactionId, e);
128 if (connection != null) {
129 try {
130 connection.rollback();
131 } catch (SQLException se) {
132 log.error("Failed to rollback", se);
133 }
134 }
135 transactionLog.logAbortFailure(transactionId, participantId, e);
136 return false;
137 } finally {
138 if (connection != null) {
139 try {
140 connection.close();
141 } catch (SQLException e) {
142 log.error("Failed to close connection", e);
143 }
144 }
145 }
146 }
147
148 /**
149 * 执行预提交操作
150 */
151 private boolean executePreCommit(Connection connection, TransactionContext context) {
152 // 具体的预提交逻辑
153 // 例如:检查资源可用性、预留资源等
154 return true;
155 }
156
157 /**
158 * 执行提交操作
159 */
160 private boolean executeCommit(Connection connection, TransactionContext context) {
161 // 具体的提交逻辑
162 // 例如:更新数据库、发送消息等
163 return true;
164 }
165
166 /**
167 * 执行回滚操作
168 */
169 private boolean executeAbort(Connection connection, TransactionContext context) {
170 // 具体的回滚逻辑
171 // 例如:释放资源、撤销操作等
172 return true;
173 }
174}

2.4 2PC的优缺点

优点

  • 强一致性:保证事务的ACID特性
  • 实现简单:协议相对简单,易于理解
  • 理论成熟:有完善的理论基础

缺点

  • 性能问题

    • 协调者单点瓶颈
    • 准备阶段长期持锁,影响吞吐量
    • 两阶段通信开销大
  • 可用性问题

    • 协调者单点故障
    • 参与者故障影响整个事务
    • 网络分区导致阻塞
  • 恢复复杂

    • 协调者崩溃恢复复杂
    • 需要超时机制和重选策略
    • 状态不一致处理困难
2PC适用场景

2PC适用于对一致性要求极高、事务规模较小的场景,如金融交易、库存管理等。

3. 三阶段提交协议(3PC)

3.1 3PC协议原理

三阶段提交协议是对2PC的改进,通过增加预提交阶段来减少阻塞时间,提高系统性能。

协议流程

3.2 3PC的优势

相比2PC的改进

  • 减少阻塞时间:预提交阶段减少了资源锁定时间
  • 提高性能:并行处理能力更强
  • 更好的容错性:超时机制更完善

仍然存在的问题

  • 实现复杂:增加了预提交阶段的复杂性
  • 网络开销:多了一个阶段的通信开销
  • 工程落地困难:实际应用中较少采用
3PC现状

现代分布式系统较少直接使用3PC,更多采用基于补偿的最终一致性方案,如TCC、Saga等。

4. TCC模式(Try-Confirm-Cancel)

4.1 TCC模式原理

TCC(Try-Confirm-Cancel)是一种基于补偿的分布式事务模式,将事务拆分为三个阶段:

  • Try阶段:资源预留,检查并预留资源
  • Confirm阶段:确认执行,使用预留的资源
  • Cancel阶段:取消执行,释放预留的资源

4.2 TCC接口设计

TCC接口定义
java
1public interface TccService {
2
3 /**
4 * Try阶段:资源预留
5 * @param context 事务上下文
6 * @return 是否预留成功
7 */
8 boolean try(TransactionContext context);
9
10 /**
11 * Confirm阶段:确认执行
12 * @param context 事务上下文
13 * @return 是否确认成功
14 */
15 boolean confirm(TransactionContext context);
16
17 /**
18 * Cancel阶段:取消执行
19 * @param context 事务上下文
20 * @return 是否取消成功
21 */
22 boolean cancel(TransactionContext context);
23}

4.3 账户服务TCC实现

账户服务TCC实现
java
1@Service
2public class AccountTccService implements TccService {
3
4 private final AccountRepository accountRepository;
5 private final TransactionLog transactionLog;
6
7 @Override
8 public boolean try(TransactionContext context) {
9 String transactionId = context.getTransactionId();
10 String accountId = context.getAccountId();
11 BigDecimal amount = context.getAmount();
12
13 try {
14 // 1. 检查账户是否存在
15 Account account = accountRepository.findById(accountId);
16 if (account == null) {
17 log.error("Account not found: " + accountId);
18 return false;
19 }
20
21 // 2. 检查余额是否充足
22 if (account.getBalance().compareTo(amount) < 0) {
23 log.error("Insufficient balance: " + accountId);
24 return false;
25 }
26
27 // 3. 冻结资金(预留资源)
28 account.setFrozenAmount(account.getFrozenAmount().add(amount));
29 account.setBalance(account.getBalance().subtract(amount));
30 accountRepository.save(account);
31
32 // 4. 记录Try操作
33 transactionLog.logTry(transactionId, accountId, amount);
34
35 return true;
36
37 } catch (Exception e) {
38 log.error("Try failed for account: " + accountId, e);
39 return false;
40 }
41 }
42
43 @Override
44 public boolean confirm(TransactionContext context) {
45 String transactionId = context.getTransactionId();
46 String accountId = context.getAccountId();
47 BigDecimal amount = context.getAmount();
48
49 try {
50 // 1. 检查是否已经Try过
51 if (!transactionLog.hasTried(transactionId, accountId)) {
52 log.error("Transaction not tried: " + transactionId);
53 return false;
54 }
55
56 // 2. 检查是否已经Confirm过(幂等性)
57 if (transactionLog.hasConfirmed(transactionId, accountId)) {
58 log.info("Transaction already confirmed: " + transactionId);
59 return true;
60 }
61
62 // 3. 确认扣款(使用冻结的资金)
63 Account account = accountRepository.findById(accountId);
64 if (account != null) {
65 // 解冻资金,实际扣款
66 account.setFrozenAmount(account.getFrozenAmount().subtract(amount));
67 accountRepository.save(account);
68 }
69
70 // 4. 记录Confirm操作
71 transactionLog.logConfirm(transactionId, accountId, amount);
72
73 return true;
74
75 } catch (Exception e) {
76 log.error("Confirm failed for account: " + accountId, e);
77 return false;
78 }
79 }
80
81 @Override
82 public boolean cancel(TransactionContext context) {
83 String transactionId = context.getTransactionId();
84 String accountId = context.getAccountId();
85 BigDecimal amount = context.getAmount();
86
87 try {
88 // 1. 检查是否已经Try过
89 if (!transactionLog.hasTried(transactionId, accountId)) {
90 log.error("Transaction not tried: " + transactionId);
91 return false;
92 }
93
94 // 2. 检查是否已经Cancel过(幂等性)
95 if (transactionLog.hasCancelled(transactionId, accountId)) {
96 log.info("Transaction already cancelled: " + transactionId);
97 return true;
98 }
99
100 // 3. 解冻资金(释放预留的资源)
101 Account account = accountRepository.findById(accountId);
102 if (account != null) {
103 account.setFrozenAmount(account.getFrozenAmount().subtract(amount));
104 account.setBalance(account.getBalance().add(amount));
105 accountRepository.save(account);
106 }
107
108 // 4. 记录Cancel操作
109 transactionLog.logCancel(transactionId, accountId, amount);
110
111 return true;
112
113 } catch (Exception e) {
114 log.error("Cancel failed for account: " + accountId, e);
115 return false;
116 }
117 }
118}

4.4 库存服务TCC实现

库存服务TCC实现
java
1@Service
2public class InventoryTccService implements TccService {
3
4 private final InventoryRepository inventoryRepository;
5 private final TransactionLog transactionLog;
6
7 @Override
8 public boolean try(TransactionContext context) {
9 String transactionId = context.getTransactionId();
10 String productId = context.getProductId();
11 int quantity = context.getQuantity();
12
13 try {
14 // 1. 检查商品是否存在
15 Inventory inventory = inventoryRepository.findByProductId(productId);
16 if (inventory == null) {
17 log.error("Product not found: " + productId);
18 return false;
19 }
20
21 // 2. 检查库存是否充足
22 if (inventory.getAvailableStock() < quantity) {
23 log.error("Insufficient stock: " + productId);
24 return false;
25 }
26
27 // 3. 预留库存
28 inventory.setAvailableStock(inventory.getAvailableStock() - quantity);
29 inventory.setReservedStock(inventory.getReservedStock() + quantity);
30 inventoryRepository.save(inventory);
31
32 // 4. 记录Try操作
33 transactionLog.logTry(transactionId, productId, quantity);
34
35 return true;
36
37 } catch (Exception e) {
38 log.error("Try failed for product: " + productId, e);
39 return false;
40 }
41 }
42
43 @Override
44 public boolean confirm(TransactionContext context) {
45 String transactionId = context.getTransactionId();
46 String productId = context.getProductId();
47 int quantity = context.getQuantity();
48
49 try {
50 // 1. 检查是否已经Try过
51 if (!transactionLog.hasTried(transactionId, productId)) {
52 log.error("Transaction not tried: " + transactionId);
53 return false;
54 }
55
56 // 2. 检查是否已经Confirm过(幂等性)
57 if (transactionLog.hasConfirmed(transactionId, productId)) {
58 log.info("Transaction already confirmed: " + transactionId);
59 return true;
60 }
61
62 // 3. 确认扣减库存
63 Inventory inventory = inventoryRepository.findByProductId(productId);
64 if (inventory != null) {
65 // 解冻库存,实际扣减
66 inventory.setReservedStock(inventory.getReservedStock() - quantity);
67 inventoryRepository.save(inventory);
68 }
69
70 // 4. 记录Confirm操作
71 transactionLog.logConfirm(transactionId, productId, quantity);
72
73 return true;
74
75 } catch (Exception e) {
76 log.error("Confirm failed for product: " + productId, e);
77 return false;
78 }
79 }
80
81 @Override
82 public boolean cancel(TransactionContext context) {
83 String transactionId = context.getTransactionId();
84 String productId = context.getProductId();
85 int quantity = context.getQuantity();
86
87 try {
88 // 1. 检查是否已经Try过
89 if (!transactionLog.hasTried(transactionId, productId)) {
90 log.error("Transaction not tried: " + transactionId);
91 return false;
92 }
93
94 // 2. 检查是否已经Cancel过(幂等性)
95 if (transactionLog.hasCancelled(transactionId, productId)) {
96 log.info("Transaction already cancelled: " + transactionId);
97 return true;
98 }
99
100 // 3. 释放预留库存
101 Inventory inventory = inventoryRepository.findByProductId(productId);
102 if (inventory != null) {
103 inventory.setReservedStock(inventory.getReservedStock() - quantity);
104 inventory.setAvailableStock(inventory.getAvailableStock() + quantity);
105 inventoryRepository.save(inventory);
106 }
107
108 // 4. 记录Cancel操作
109 transactionLog.logCancel(transactionId, productId, quantity);
110
111 return true;
112
113 } catch (Exception e) {
114 log.error("Cancel failed for product: " + productId, e);
115 return false;
116 }
117 }
118}

4.5 TCC事务协调器

TCC事务协调器
java
1@Service
2public class TccTransactionCoordinator {
3
4 private final List<TccService> tccServices;
5 private final TransactionLog transactionLog;
6
7 public TccTransactionCoordinator(List<TccService> tccServices) {
8 this.tccServices = tccServices;
9 this.transactionLog = new TransactionLog();
10 }
11
12 /**
13 * 执行TCC事务
14 */
15 public boolean executeTransaction(TransactionContext context) {
16 String transactionId = context.getTransactionId();
17
18 try {
19 // 记录事务开始
20 transactionLog.logTransactionStart(transactionId, context);
21
22 // Try阶段:所有服务预留资源
23 if (!tryPhase(transactionId, context)) {
24 // Try失败,执行Cancel
25 cancelPhase(transactionId, context);
26 return false;
27 }
28
29 // Confirm阶段:所有服务确认执行
30 if (!confirmPhase(transactionId, context)) {
31 // Confirm失败,需要人工干预
32 log.error("Transaction confirm failed: " + transactionId);
33 return false;
34 }
35
36 // 记录事务完成
37 transactionLog.logTransactionComplete(transactionId);
38 return true;
39
40 } catch (Exception e) {
41 log.error("Transaction execution failed: " + transactionId, e);
42 // 发生异常,执行Cancel
43 cancelPhase(transactionId, context);
44 return false;
45 }
46 }
47
48 /**
49 * Try阶段:所有服务预留资源
50 */
51 private boolean tryPhase(String transactionId, TransactionContext context) {
52 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
53
54 // 并行执行所有服务的Try操作
55 for (TccService service : tccServices) {
56 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
57 try {
58 return service.try(context);
59 } catch (Exception e) {
60 log.error("Service try failed", e);
61 return false;
62 }
63 });
64 futures.add(future);
65 }
66
67 // 等待所有服务完成Try
68 try {
69 CompletableFuture<Void> allFutures = CompletableFuture.allOf(
70 futures.toArray(new CompletableFuture[0])
71 );
72 allFutures.get(30, TimeUnit.SECONDS); // 30秒超时
73
74 // 检查所有服务是否都Try成功
75 for (CompletableFuture<Boolean> future : futures) {
76 if (!future.get()) {
77 return false; // 有服务Try失败
78 }
79 }
80
81 return true; // 所有服务都Try成功
82
83 } catch (Exception e) {
84 log.error("Try phase failed: " + transactionId, e);
85 return false;
86 }
87 }
88
89 /**
90 * Confirm阶段:所有服务确认执行
91 */
92 private boolean confirmPhase(String transactionId, TransactionContext context) {
93 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
94
95 // 并行执行所有服务的Confirm操作
96 for (TccService service : tccServices) {
97 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
98 try {
99 return service.confirm(context);
100 } catch (Exception e) {
101 log.error("Service confirm failed", e);
102 return false;
103 }
104 });
105 futures.add(future);
106 }
107
108 // 等待所有服务完成Confirm
109 try {
110 CompletableFuture<Void> allFutures = CompletableFuture.allOf(
111 futures.toArray(new CompletableFuture[0])
112 );
113 allFutures.get(30, TimeUnit.SECONDS); // 30秒超时
114
115 // 检查所有服务是否都Confirm成功
116 for (CompletableFuture<Boolean> future : futures) {
117 if (!future.get()) {
118 return false; // 有服务Confirm失败
119 }
120 }
121
122 return true; // 所有服务都Confirm成功
123
124 } catch (Exception e) {
125 log.error("Confirm phase failed: " + transactionId, e);
126 return false;
127 }
128 }
129
130 /**
131 * Cancel阶段:所有服务取消执行
132 */
133 private void cancelPhase(String transactionId, TransactionContext context) {
134 // 并行执行所有服务的Cancel操作
135 for (TccService service : tccServices) {
136 CompletableFuture.runAsync(() -> {
137 try {
138 service.cancel(context);
139 } catch (Exception e) {
140 log.error("Service cancel failed", e);
141 }
142 });
143 }
144
145 // 记录事务取消
146 transactionLog.logTransactionCancel(transactionId);
147 }
148}

4.6 TCC的关键问题

1. 幂等性问题

幂等性处理示例
java
1@Service
2public class IdempotentTccService {
3
4 private final TransactionLog transactionLog;
5
6 public boolean confirm(TransactionContext context) {
7 String transactionId = context.getTransactionId();
8
9 // 检查是否已经Confirm过(幂等性检查)
10 if (transactionLog.hasConfirmed(transactionId)) {
11 log.info("Transaction already confirmed: " + transactionId);
12 return true; // 直接返回成功
13 }
14
15 // 执行Confirm逻辑
16 boolean success = doConfirm(context);
17
18 if (success) {
19 // 记录Confirm状态
20 transactionLog.logConfirm(transactionId);
21 }
22
23 return success;
24 }
25}

2. 空回滚问题

空回滚处理示例
java
1@Service
2public class EmptyRollbackTccService {
3
4 public boolean cancel(TransactionContext context) {
5 String transactionId = context.getTransactionId();
6
7 // 检查是否已经Try过
8 if (!transactionLog.hasTried(transactionId)) {
9 // 没有Try过,记录空回滚
10 transactionLog.logEmptyRollback(transactionId);
11 return true; // 空回滚也算成功
12 }
13
14 // 执行Cancel逻辑
15 return doCancel(context);
16 }
17}

3. 悬挂问题

悬挂问题处理示例
java
1@Service
2public class HangingTccService {
3
4 public boolean confirm(TransactionContext context) {
5 String transactionId = context.getTransactionId();
6
7 // 检查是否已经Cancel过
8 if (transactionLog.hasCancelled(transactionId)) {
9 log.warn("Transaction cancelled before confirm: " + transactionId);
10 return false; // 拒绝Confirm
11 }
12
13 // 执行Confirm逻辑
14 return doConfirm(context);
15 }
16}

TCC模式

思想:将一次业务拆为三步 Try/Confirm/Cancel,Try预留资源,Confirm提交,Cancel回滚。

适用:

  • 强业务语义,可实现精细补偿
  • 每个资源均可提供Try/Confirm/Cancel接口

注意:幂等、防悬挂(Confirm前Try必须成功)、空回滚(Try未成功时的Cancel)

java
1// TCC接口示例
2public interface AccountTccService {
3 boolean tryFreeze(long userId, java.math.BigDecimal amount);
4 boolean confirm(long userId, java.math.BigDecimal amount);
5 boolean cancel(long userId, java.math.BigDecimal amount);
6}
7
8// 账户转账的应用服务(伪代码)
9public class TransferService {
10 private AccountTccService fromAccount;
11 private AccountTccService toAccount;
12
13 public boolean transfer(long from, long to, java.math.BigDecimal amt) {
14 // Try
15 boolean t1 = fromAccount.tryFreeze(from, amt);
16 boolean t2 = toAccount.tryFreeze(to, amt);
17 if (!t1 || !t2) {
18 // Cancel
19 if (t1) { fromAccount.cancel(from, amt); }
20 if (t2) { toAccount.cancel(to, amt); }
21 return false;
22 }
23 // Confirm
24 boolean c1 = fromAccount.confirm(from, amt);
25 boolean c2 = toAccount.confirm(to, amt);
26 if (!c1 || !c2) {
27 // 出现异常时执行补偿策略(根据业务定义)
28 return false;
29 }
30 return true;
31 }
32}

Saga模式

将长事务拆分为一系列可独立提交的本地事务,每步配备补偿动作(逆操作)。

两种编排:

  • Orchestration:由编排器驱动流程(状态机)
  • Choreography:各服务通过事件驱动协作(无中心)

优势:

  • 无全局锁,吞吐高,适合跨服务长链路 劣势:
  • 仅最终一致,补偿设计复杂
yaml
1# Orchestration示意(伪配置)
2states:
3 - name: CreateOrder
4 execute: order.create
5 compensate: order.cancel
6 - name: ReserveInventory
7 execute: inventory.reserve
8 compensate: inventory.release
9 - name: Pay
10 execute: payment.pay
11 compensate: payment.refund

Outbox(事务外盒)与可靠消息

目标:本地数据库写入与消息发送原子化。

做法:

  1. 业务数据与Outbox消息同一事务落库
  2. 后台投递器扫描Outbox表,发送到消息队列
  3. 消费方处理并幂等消费

优点:实现简单、对现有系统改造成本小 注意:去重、幂等键、重试与死信队列

sql
1-- Outbox表示例
2CREATE TABLE outbox (
3 id BIGINT PRIMARY KEY,
4 aggregate_id BIGINT,
5 topic VARCHAR(100),
6 payload TEXT,
7 status VARCHAR(20),
8 created_at TIMESTAMP,
9 updated_at TIMESTAMP
10);

幂等与防重

  • 业务幂等键(如订单号、请求ID)
  • 去重表/唯一索引、分布式锁
  • 消费端“至少一次”+幂等处理

选型建议

  • 强一致小范围:2PC/XA;或由单一聚合根串行化
  • 跨服务长流程:Saga/TCC + Outbox
  • 对吞吐敏感:优先补偿型而非全局锁定

面试题

  1. 2PC的两个阶段是什么?缺点有哪些
  2. TCC如何避免空回滚与悬挂
  3. Saga编排与编舞的差异,如何选择
  4. Outbox如何保证不丢消息与不重复消费
  5. 幂等的常用实现手段有哪些

参与讨论