
前面章节提及的MassTransit、dotnetcore/CAP都提供了分布式事务的处理能力但也仅局限于Saga和本地消息表模式的实现。那有没有一个独立的分布式事务解决方案涵盖多种分布式事务处理模式如Saga、TCC、XA模式等。有目前业界主要有两种开源方案其一是阿里开源的Seata另一个就是DTM。其中Seata仅支持Java、Go和Python语言因此不在.NET 的选择范围。DTM则通过提供简单易用的HTTP和gRPC接口屏蔽了语言的无关性因此支持任何开发语言接入目前提供了Go、Python、NodeJs、Ruby、Java和C#等语言的SDK。DTM全称Distributed Transaction Manager是一个分布式事务管理器解决跨数据库、跨服务、跨语言更新数据的一致性问题。它提供了Saga、TCC、 XA和二阶段消息模式以满足不同应用场景的需求同时其首创的子事务屏障技术可以有效解决幂等、悬挂和空补偿等异常问题。DTM 事务处理过程及架构那DTM是如何处理分布式事务的呢以一个经典的跨行转账业务为例来看下事务处理过程。对于跨行转账业务而言很显然是跨库跨服务的应用场景不能简单通过本地事务解决可以使用Saga模式以下是基于DTM提供的Saga事务模式成功转账的的时序图从以上时序图可以看出DTM整个全局事务分为如下几步用户定义好全局事务所有的事务分支全局事务的组成部分称为事务分支然后提交给DTMDTM持久化全局事务信息后立即返回DTM取出第一个事务分支这里是TransOut调用该服务并成功返回DTM取出第二个事务分支这里是TransIn调用该服务并成功返回DTM已完成所有的事务分支将全局事务的状态修改为已完成基于以上这个时序图的基础上再来看下DTM的架构整个DTM架构中一共有三个角色分别承担了不同的职责RM-资源管理器RM是一个应用服务通常连接到独立的数据库负责处理全局事务中的本地事务执行相关数据的修改、提交、回滚、补偿等操作。例如在前面的这个Saga事务时序图中步骤2、3中被调用的TransIn和TransOut方法所在的服务都是RM。AP-应用程序AP是一个应用服务负责全局事务的编排他会注册全局事务注册子事务调用RM接口。例如在前面的这个SAGA事务中发起步骤1的是AP它编排了一个包含TransOut、TransIn的全局事务然后提交给TMTM-事务管理器TM就是DTM服务负责全局事务的管理作为一个独立的服务而存在。每个全局事务都注册到TM每个事务分支也注册到TM。TM会协调所有的RM来执行不同的事务分支并根据执行结果决定是否提交或回滚事务。例如在前面的Saga事务时序图中TM在步骤2、3中调用了各个RM在步骤4中完成这个全局事务。总体而言AP-应用程序充当全局事务编排器的角色通过DTM提供的开箱即用的SDK进行全局事务和子事务的注册。TM-事务管理器接收到注册的全局事务和子事务后负责调用RM-资源管理器来执行对应的事务分支TM-事务管理器根据事务分支的执行结果决定是否提及或回滚事务。快速上手百闻不如一见接下来就来实际上手体验下如何基于DTM来实际应用Saga进行分布式跨行转账事务的处理。创建示例项目接下来就来创建一个示例项目使用dotnet new webapi -n DtmDemo.Webapi创建示例项目。添加Nuget包Dtmcli和Pomelo.EntityFrameworkCore.MySql。添加DTM配置项{dtm: {DtmUrl: http://localhost:36789,DtmTimeout: 10000,BranchTimeout: 10000,DBType: mysql,BarrierTableName: dtm_barrier.barrier,}}定义银行账户BankAccount实体类namespace DtmDemo.WebApi.Models{public class BankAccount{public int Id { get; set; }public decimal Balance { get; set; }}}定义DtmDemoWebApiContext数据库上下文using Microsoft.EntityFrameworkCore;namespace DtmDemo.WebApi.Data{public class DtmDemoWebApiContext : DbContext{public DtmDemoWebApiContext (DbContextOptionsDtmDemoWebApiContext options): base(options){}public DbSetDtmDemo.WebApi.Models.BankAccount BankAccount { get; set; } default!;}}注册DbContext 和DTM服务using Microsoft.EntityFrameworkCore;using DtmDemo.WebApi.Data;using Dtmcli;var builder WebApplication.CreateBuilder(args);var connectionStr builder.Configuration.GetConnectionString(DtmDemoWebApiContext);// 注册DbContextbuilder.Services.AddDbContextDtmDemoWebApiContext(options {options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr));});// 注册DTMbuilder.Services.AddDtmcli(builder.Configuration, dtm);执行dotnet ef migrations add Initial创建迁移。为便于初始化演示数据定义BankAccountController如下其中PostBankAccount接口添加了await _context.Database.MigrateAsync();用于自动应用迁移。using Microsoft.AspNetCore.Mvc;using Microsoft.EntityFrameworkCore;using DtmDemo.WebApi.Data;using DtmDemo.WebApi.Models;using Dtmcli;namespace DtmDemo.WebApi.Controllers{[Route(api/[controller])][ApiController]public class BankAccountsController : ControllerBase{private readonly DtmDemoWebApiContext _context;public BankAccountsController(DtmDemoWebApiContext context){_context context;}[HttpGet]public async TaskActionResultIEnumerableBankAccount GetBankAccount(){return await _context.BankAccount.ToListAsync();}[HttpPost]public async TaskActionResultBankAccount PostBankAccount(BankAccount bankAccount){await _context.Database.MigrateAsync();_context.BankAccount.Add(bankAccount);await _context.SaveChangesAsync();return Ok(bankAccount);}}应用Saga模式接下来定义SagaDemoController来使用DTM的Saga模式来模拟跨行转账分布式事务using Microsoft.AspNetCore.Mvc;using Microsoft.EntityFrameworkCore;using DtmDemo.WebApi.Data;using DtmDemo.WebApi.Models;using Dtmcli;using DtmCommon;namespace DtmDemo.WebApi.Controllers{[Route(api/[controller])][ApiController]public class SagaDemoController : ControllerBase{private readonly DtmDemoWebApiContext _context;private readonly IConfiguration _configuration;private readonly IDtmClient _dtmClient;private readonly IDtmTransFactory _transFactory;private readonly IBranchBarrierFactory _barrierFactory;private readonly ILoggerBankAccountsController _logger;public SagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILoggerBankAccountsController logger, IBranchBarrierFactory barrierFactory){this._context context;this._configuration configuration;this._dtmClient dtmClient;this._transFactory transFactory;this._logger logger;this._barrierFactory barrierFactory;}}对于跨行转账业务使用DTM的Saga模式首先要进行事务拆分可以拆分为以下4个子事务并分别实现转出子事务TransferOut[HttpPost(TransferOut)]public async TaskIActionResult TransferOut([FromBody] TransferRequest request){var msg $用户{request.UserId}转出{request.Amount}元;_logger.LogInformation($转出子事务-启动{msg});// 1. 创建子事务屏障var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);try{using (var conn _context.Database.GetDbConnection()){// 2. 在子事务屏障内执行事务操作await branchBarrier.Call(conn, async (tx) {_logger.LogInformation($转出子事务-执行{msg});await _context.Database.UseTransactionAsync(tx);var bankAccount await _context.BankAccount.FindAsync(request.UserId);if (bankAccount null || bankAccount.Balance request.Amount)throw new InvalidDataException(账户不存在或余额不足);bankAccount.Balance - request.Amount;await _context.SaveChangesAsync();});}}catch (InvalidDataException ex){_logger.LogInformation($转出子事务-失败{ex.Message});// 3. 按照接口协议返回409以表示子事务失败return new StatusCodeResult(StatusCodes.Status409Conflict);}_logger.LogInformation($转出子事务-成功{msg});return Ok();}以上代码中有几点需要额外注意使用Saga模式必须开启子事务屏障_barrierFactory.CreateBranchBarrier(Request.Query)其中Request.Query中的参数由DTM 生成类似?branch_id01gidXTzKHgxemLyL8EXtMTLvzKopactiontrans_typesaga主要包含四个参数gid全局事务Idtrans_type事务类型是saga、msg、xa或者是tcc。branch_id子事务的Idop当前操作对于Saga事务模式要么为action正向操作要么为compensate补偿操作。必须在子事务屏障内执行事务操作branchBarrier.Call(conn, async (tx) {}对于Saga正向操作而言业务上的失败与异常是需要做严格区分的例如前面的余额不足是业务上的失败必须回滚。而对于网络抖动等其他外界原因导致的事务失败属于业务异常则需要重试。因此若因业务失败这里是账户不存在或余额不足而导致子事务失败则必须通过抛异常的方式并返回**409**状态码以告知DTM 子事务失败。以上通过抛出异常的方式中断子事务执行并在外围捕获特定异常返回409状态码。在外围捕获异常时切忌放大异常捕获比如直接catch(Exception)如此会捕获由于网络等其他原因导致的异常而导致DTM 不再自动处理该异常比如业务异常时的自动重试。转出补偿子事务TransferOut_Compensate转出补偿就是回滚转出操作进行账户余额归还实现如下[HttpPost(TransferOut_Compensate)]public async TaskIActionResult TransferOut_Compensate([FromBody] TransferRequest request){var msg $用户{request.UserId}回滚转出{request.Amount}元;_logger.LogInformation($转出补偿子事务-启动{msg});// 1. 创建子事务屏障var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);using (var conn _context.Database.GetDbConnection()){// 在子事务屏障内执行事务操作await branchBarrier.Call(conn, async (tx) {_logger.LogInformation($转出补偿子事务-执行{msg});await _context.Database.UseTransactionAsync(tx);var bankAccount await _context.BankAccount.FindAsync(request.UserId);if (bankAccount null)return; //对于补偿操作可直接返回中断后续操作bankAccount.Balance request.Amount;await _context.SaveChangesAsync();});}_logger.LogInformation($转出补偿子事务-成功);// 2. 因补偿操作必须成功所以必须返回200。return Ok();}由于DTM设计为总是执行补偿也就是说即使正向操作子事务失败时DTM 仍旧会执行补偿逻辑。但子事务屏障会在执行时判断正向操作的执行状态当子事务失败时并不会执行补偿逻辑。另外DTM的补偿操作是要求最终成功的只要还没成功就会不断进行重试直到成功。因此在补偿子事务中即使补偿子事务中出现业务失败时也必须返回**200**。因此当出现bankAccountnull时可以直接 return。转入子事务TransferIn转入子事务和转出子事务的实现基本类似都是开启子事务屏障后在branchBarrier.Call(conn, async tx {}中实现事务逻辑并通过抛异常的方式并最终返回409状态码来显式告知DTM 子事务执行失败。[HttpPost(TransferIn)]public async TaskIActionResult TransferIn([FromBody] TransferRequest request){var msg $用户{request.UserId}转入{request.Amount}元;_logger.LogInformation($转入子事务-启动{msg});var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);try{using (var conn _context.Database.GetDbConnection()){await branchBarrier.Call(conn, async (tx) {_logger.LogInformation($转入子事务-执行{msg});await _context.Database.UseTransactionAsync(tx);var bankAccount await _context.BankAccount.FindAsync(request.UserId);if (bankAccount null)throw new InvalidDataException(账户不存在);bankAccount.Balance request.Amount;await _context.SaveChangesAsync();});}}catch (InvalidDataException ex){_logger.LogInformation($转入子事务-失败{ex.Message});return new StatusCodeResult(StatusCodes.Status409Conflict);}_logger.LogInformation($转入子事务-成功{msg});return Ok();}转入补偿子事务TransferIn_Compensate转入补偿子事务和转出补偿子事务的实现也基本类似都是开启子事务屏障后在branchBarrier.Call(conn, async tx {}中实现事务逻辑并最终返回200状态码来告知DTM 补偿子事务执行成功。[HttpPost(TransferIn_Compensate)]public async TaskIActionResult TransferIn_Compensate([FromBody] TransferRequest request){var msg 用户{request.UserId}回滚转入{request.Amount}元;_logger.LogInformation($转入补偿子事务-启动{msg});var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);using (var conn _context.Database.GetDbConnection()){await branchBarrier.Call(conn, async (tx) {_logger.LogInformation($转入补偿子事务-执行{msg});await _context.Database.UseTransactionAsync(tx);var bankAccount await _context.BankAccount.FindAsync(request.UserId);if (bankAccount null) return;bankAccount.Balance - request.Amount;await _context.SaveChangesAsync();});}_logger.LogInformation($转入补偿子事务-成功);return Ok();}编排Saga事务拆分完子事务最后就可以进行Saga事务编排了其代码如下所示[HttpPost(Transfer)]public async TaskIActionResult Transfer(int fromUserId, int toUserId, decimal amount,CancellationToken cancellationToken){try{_logger.LogInformation($转账事务-启动用户{fromUserId}转账{amount}元到用户{toUserId});//1. 生成全局事务IDvar gid await _dtmClient.GenGid(cancellationToken);var bizUrl _configuration.GetValuestring(TransferBaseURL);//2. 创建Sagavar saga _transFactory.NewSaga(gid);//3. 添加子事务saga.Add(bizUrl /TransferOut, bizUrl /TransferOut_Compensate,new TransferRequest(fromUserId, amount)).Add(bizUrl /TransferIn, bizUrl /TransferIn_Compensate,new TransferRequest(toUserId, amount)).EnableWaitResult(); // 4. 按需启用是否等待事务执行结果//5. 提交Saga事务await saga.Submit(cancellationToken);}catch (DtmException ex) // 6. 如果开启了EnableWaitResult()则可通过捕获异常的方式捕获事务失败的结果。{_logger.LogError($转账事务-失败用户{fromUserId}转账{amount}元到用户{toUserId}失败);return new BadRequestObjectResult($转账失败:{ex.Message});}_logger.LogError($转账事务-完成用户{fromUserId}转账{amount}元到用户{toUserId}成功);return Ok($转账事务-完成用户{fromUserId}转账{amount}元到用户{toUserId}成功);}主要步骤如下生成全局事务Idvar gid await _dtmClient.GenGid(cancellationToken);创建Saga全局事务_transFactory.NewSaga(gid);添加子事务saga.Add(string action, string compensate, object postData);包含正向和反向子事务。如果依赖事务执行结果可通过EnableWaitResult()开启事务结果等待。提交Saga全局事务saga.Submit(cancellationToken);若开启了事务结果等待可以通过try...catch..来捕获DtmExcepiton异常来获取事务执行异常信息。运行项目既然DTM作为一个独立的服务存在其负责通过HTTP或gRPC协议发起子事务的调用因此首先需要启动一个DTM实例又由于本项目依赖MySQL因此我们采用Docker Compose的方式来启动项目。在Visual Studio中通过右键项目-Add-Docker Support-Linux即可添加Dockerfile如下所示FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS baseWORKDIR /appEXPOSE 80EXPOSE 443FROM mcr.microsoft.com/dotnet/sdk:6.0 AS buildWORKDIR /srcCOPY [DtmDemo.WebApi/DtmDemo.WebApi.csproj, DtmDemo.WebApi/]RUN dotnet restore DtmDemo.WebApi/DtmDemo.WebApi.csprojCOPY . .WORKDIR /src/DtmDemo.WebApiRUN dotnet build DtmDemo.WebApi.csproj -c Release -o /app/buildFROM build AS publishRUN dotnet publish DtmDemo.WebApi.csproj -c Release -o /app/publishFROM base AS finalWORKDIR /appCOPY --frompublish /app/publish .ENTRYPOINT [dotnet, DtmDemo.WebApi.dll]在Visual Studio中通过右键项目-Add Container Orchestrator Support-Docker Compose即可添加docker-compose.yml由于整个项目依赖mysql和DTM修改docker-compose.yml如下所示其中定义了三个服务dbdtm和dtmdemo.webapi。version: 3.4services:db:image: mysql:5.7container_name: dtm-mysqlenvironment:MYSQL_ROOT_PASSWORD: 123456 # 指定MySQL初始密码volumes:- ./docker/mysql/scripts:/docker-entrypoint-initdb.d # 挂载用于初始化数据库的脚本ports:- 3306:3306dtm:depends_on: [db]image: yedf/dtm:latestcontainer_name: dtm-svcenvironment:IS_DOCKER: 1STORE_DRIVER: mysql # 指定使用MySQL持久化DTM事务数据STORE_HOST: db # 指定MySQL服务名这里是dbSTORE_USER: rootSTORE_PASSWORD: 123456STORE_PORT: 3306STORE_DB: dtm # 指定DTM 数据库名ports:- 36789:36789 # DTM HTTP 端口- 36790:36790 # DTM gRPC 端口dtmdemo.webapi:depends_on: [dtm, db]image: ${DOCKER_REGISTRY-}dtmdemowebapienvironment:ASPNETCORE_ENVIRONMENT: docker # 设定启动环境为dockercontainer_name: dtm-webapi-demobuild:context: .dockerfile: DtmDemo.WebApi/Dockerfileports:- 31293:80 # 映射Demo:80端口到本地31293端口- 31294:443 # 映射Demo:443端口到本地31294端口