分布式系统一致性与事务处理实战

发布时间:2026/5/27 4:12:00

分布式系统一致性与事务处理实战 分布式系统一致性与事务处理实战一、分布式一致性概述在分布式系统中一致性是指多个节点之间数据的同步程度。由于网络延迟、节点故障等因素保证强一致性变得非常困难。本文将深入探讨分布式一致性的理论基础、常见的一致性模型以及事务处理的最佳实践。二、一致性模型2.1 CAP定理graph TD A[CAP定理] -- B[一致性 Consistency] A -- C[可用性 Availability] A -- D[分区容错性 Partition Tolerance] B -- E[所有节点同时看到相同数据] C -- F[每个请求都能得到响应] D -- G[网络分区时系统仍能工作]一致性模型特点适用场景强一致性所有副本同时更新银行转账、金融交易弱一致性最终达到一致搜索引擎、缓存更新最终一致性一定时间内达到一致社交网络、消息队列2.2 BASE理论基本可用Basically Available系统在故障时仍能提供降级服务软状态Soft State允许数据存在中间状态最终一致性Eventually Consistent最终达到一致状态三、分布式事务处理3.1 两阶段提交2PCsequenceDiagram participant TM as 事务管理器 participant RM1 as 资源管理器1 participant RM2 as 资源管理器2 TM-RM1: prepare RM1-TM: prepared TM-RM2: prepare RM2-TM: prepared TM-RM1: commit RM1-TM: committed TM-RM2: commit RM2-TM: committed2PC存在的问题同步阻塞所有参与者在等待其他参与者响应时都处于阻塞状态单点故障事务管理器故障会导致整个事务失败数据不一致在提交阶段部分节点可能失败3.2 三阶段提交3PCsequenceDiagram participant TM as 事务管理器 participant RM1 as 资源管理器1 participant RM2 as 资源管理器2 TM-RM1: canCommit RM1-TM: yes TM-RM2: canCommit RM2-TM: yes TM-RM1: preCommit RM1-TM: preCommitted TM-RM2: preCommit RM2-TM: preCommitted TM-RM1: doCommit RM1-TM: committed TM-RM2: doCommit RM2-TM: committed3PC改进点引入了准备阶段的确认减少阻塞时间超时机制避免单点故障导致的阻塞3.3 分布式事务解决方案对比方案一致性性能复杂度适用场景2PC强一致低中等银行、金融TCC最终一致高高电商订单可靠消息最终一致高中等异步场景Saga最终一致高高长事务四、Seata分布式事务实战4.1 架构设计graph TD A[业务应用] -- B[Seata TM] A -- C[Seata RM] B -- D[Seata TC] C -- D D -- E[(MySQL)]4.2 配置文件# application.yml server: port: 8091 spring: application: name: seata-server seata: registry: type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: public group: SEATA_GROUP config: type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: public group: SEATA_GROUP store: mode: db db: datasource: druid db-type: mysql driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata?useSSLfalse user: root password: password4.3 事务注解使用Service public class OrderService { Autowired private OrderRepository orderRepository; Autowired private InventoryFeignClient inventoryClient; Autowired private PaymentFeignClient paymentClient; GlobalTransactional public void createOrder(OrderCreateRequest request) { // 创建订单 Order order Order.builder() .userId(request.getUserId()) .productId(request.getProductId()) .quantity(request.getQuantity()) .status(PENDING) .build(); orderRepository.save(order); // 扣减库存 inventoryClient.decreaseStock(request.getProductId(), request.getQuantity()); // 扣款 paymentClient.deduct(request.getUserId(), request.getTotalAmount()); } }4.4 TCC模式实现LocalTCC public interface InventoryService { TwoPhaseBusinessAction(name decreaseStock, commitMethod commit, rollbackMethod rollback) void decreaseStock(BusinessActionContext context, Long productId, Integer quantity); void commit(BusinessActionContext context); void rollback(BusinessActionContext context); } Service public class InventoryServiceImpl implements InventoryService { Autowired private InventoryRepository inventoryRepository; Override public void decreaseStock(BusinessActionContext context, Long productId, Integer quantity) { Inventory inventory inventoryRepository.findById(productId).orElseThrow(); if (inventory.getStock() quantity) { throw new BusinessException(库存不足); } // 冻结库存 inventory.setStock(inventory.getStock() - quantity); inventory.setFrozenStock(inventory.getFrozenStock() quantity); inventoryRepository.save(inventory); // 保存上下文 context.setActionContext(productId, productId); context.setActionContext(quantity, quantity); } Override public void commit(BusinessActionContext context) { Long productId (Long) context.getActionContext(productId); Integer quantity (Integer) context.getActionContext(quantity); Inventory inventory inventoryRepository.findById(productId).orElseThrow(); inventory.setFrozenStock(inventory.getFrozenStock() - quantity); inventoryRepository.save(inventory); } Override public void rollback(BusinessActionContext context) { Long productId (Long) context.getActionContext(productId); Integer quantity (Integer) context.getActionContext(quantity); Inventory inventory inventoryRepository.findById(productId).orElseThrow(); inventory.setStock(inventory.getStock() quantity); inventory.setFrozenStock(inventory.getFrozenStock() - quantity); inventoryRepository.save(inventory); } }五、可靠消息最终一致性5.1 消息队列事务方案sequenceDiagram participant App as 业务应用 participant DB as 数据库 participant MQ as 消息队列 participant Consumer as 消费者 App-DB: 1. 保存业务数据 消息记录 DB--App: 保存成功 App-MQ: 2. 发送消息 MQ--App: 发送成功 App-DB: 3. 删除消息记录 DB--App: 删除成功 MQ-Consumer: 4. 消费消息 Consumer-Consumer: 5. 执行业务逻辑 Consumer-MQ: 6. ACK确认5.2 RocketMQ事务消息Service public class OrderMessageService implements RocketMQLocalTransactionListener { Autowired private OrderRepository orderRepository; Autowired private RocketMQTemplate rocketMQTemplate; public void sendOrderMessage(Order order) { MessageBuilder.WithPayload(order) .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()) .build(); rocketMQTemplate.sendMessageInTransaction( order-topic, MessageBuilder.withPayload(order).build(), order ); } Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { Order order (Order) arg; try { orderRepository.save(order); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String orderId msg.getHeaders().get(orderId, String.class); Order order orderRepository.findById(Long.parseLong(orderId)).orElse(null); if (order ! null) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; } }5.3 消息消费幂等性Component public class OrderConsumer { Autowired private OrderRepository orderRepository; Autowired private RedisTemplateString, String redisTemplate; private static final String CONSUMED_PREFIX msg:consumed:; private static final String LOCK_PREFIX msg:lock:; RocketMQMessageListener( topic order-topic, consumerGroup order-consumer-group ) public void onMessage(MessageExt message) { String msgId message.getMsgId(); // 分布式锁防止重复消费 Boolean locked redisTemplate.opsForValue().setIfAbsent( LOCK_PREFIX msgId, locked, 5, TimeUnit.MINUTES ); if (!locked) { return; } try { // 检查是否已消费 if (redisTemplate.hasKey(CONSUMED_PREFIX msgId)) { return; } // 执行业务逻辑 Order order JSON.parseObject(message.getBody(), Order.class); processOrder(order); // 标记已消费 redisTemplate.opsForValue().set(CONSUMED_PREFIX msgId, true, 24, TimeUnit.HOURS); } finally { // 释放锁 redisTemplate.delete(LOCK_PREFIX msgId); } } }六、一致性保障策略6.1 分布式锁实现Component public class RedisDistributedLock { Autowired private StringRedisTemplate redisTemplate; private static final String LOCK_PREFIX lock:; private static final long LOCK_EXPIRE 30000; // 30秒 public boolean tryLock(String lockKey, String requestId) { return redisTemplate.opsForValue().setIfAbsent( LOCK_PREFIX lockKey, requestId, LOCK_EXPIRE, TimeUnit.MILLISECONDS ); } public boolean unlock(String lockKey, String requestId) { String script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end ; Long result redisTemplate.execute( new DefaultRedisScript(script, Long.class), Collections.singletonList(LOCK_PREFIX lockKey), requestId ); return result ! null result 0; } }6.2 数据核对与补偿Component public class DataConsistencyChecker { Autowired private OrderRepository orderRepository; Autowired private InventoryRepository inventoryRepository; Autowired private CompensationService compensationService; Scheduled(fixedRate 300000) // 5分钟检查一次 public void checkConsistency() { ListOrder pendingOrders orderRepository.findByStatus(PENDING); for (Order order : pendingOrders) { if (isOrderConsistent(order)) { order.setStatus(COMPLETED); orderRepository.save(order); } else { compensationService.compensate(order); } } } private boolean isOrderConsistent(Order order) { Inventory inventory inventoryRepository.findById(order.getProductId()).orElse(null); return inventory ! null inventory.getStock() 0; } }6.3 一致性保障检查清单分布式事务 - [ ] 选择合适的事务模式2PC/TCC/Saga/消息 - [ ] 实现幂等性保障 - [ ] 设置合理的超时时间 - [ ] 实现重试机制 - [ ] 实现补偿机制 数据一致性 - [ ] 定期数据核对 - [ ] 异常数据告警 - [ ] 自动/手动补偿流程 - [ ] 日志记录便于追溯 系统设计 - [ ] 避免分布式事务尽量使用本地事务 - [ ] 使用最终一致性替代强一致性 - [ ] 设计幂等接口 - [ ] 实现分布式锁七、总结分布式一致性是分布式系统设计中的核心难题。在实际应用中需要根据业务场景选择合适的一致性模型和事务处理方案。通过合理的架构设计、幂等性保障和补偿机制可以在保证系统可用性的同时实现数据的最终一致性。参考资料Seata官方文档https://seata.io/CAP Theorem: https://en.wikipedia.org/wiki/CAP_theoremBASE Theory: https://queue.acm.org/detail.cfm?id1394128RocketMQ事务消息https://rocketmq.apache.org/docs/transaction-example/

相关新闻