.NET 高级开发 | 设计、实现一个事件总线框架

发布时间:2026/6/23 11:34:19

.NET 高级开发 | 设计、实现一个事件总线框架 使用事件总线在编写事件总线框架之前首先了解 Maomi.EventBus 的使用其示例代码参考 Demo8.Console 项目。创建一个项目然后通过 nuget 引入 Maomi.EventBus 包。这里我们来模拟用户注册的流程模拟用户注册流程。假设用户提交信息后系统的处理过程是检查验证码、将用户信息写到 Users 表中、初始化用户数据、发送电子邮箱。首先是定义一个事件模型类模型类必须继承 Event 抽象类或 IEvent 接口。public record class MyEvent : Event { public string Name { get; set; } public string EMail { get; set; } public override string ToString() { return $用户名: {Name} 邮箱: {EMail}; } }接着要编写事件执行器在 Maomi.EventBus 中事件执行器是一个方法而不是一个类型事件执行器方法命名没有约束只要求方法参数包含MyEvent事件即可执行器方法所在的类型不需要继承任何接口只需要使用EventAttribute特性标记即可。首先是检查用户的验证码是否正确[Event] // 标记此类型里有事件处理器 public class CheckImageCodeEventHandler { // 标记该方法是事件处理器并设置执行顺序 [EventHandler(Order 0)] public void Check(MyEvent event) { Console.WriteLine(event.ToString()); } }当框架扫描到 CheckImageCodeEventHandler 带有Event标识时会被自动注册到容器中其生命周期为 scope。由于 CheckImageCodeEventHandler 被注册到容器中所以可以在其构造函数中注入其它服务。[EventHandler(Order 0)]定义触发事件后该执行器的执行顺序。每个事件都有多个执行器它们之间是有顺序的通过EventHandlerAttribute.Order属性进行排序。一个类型中可以有多个执行器方法可以订阅同一个事件也可以订阅不同的事件。每个执行器方法都有一个对应顺序的的撤销器器由于回滚撤销之前的操作。在编写验证码执行器方法后我们继续完善用户注册过程其完整代码示例如下[Event] public class UserRegisterEventHandler { [EventHandler(Order 1)] public void InsertDb(MyEvent event) { var state new Random().Next(0, 2); if (state 0) Console.WriteLine(√ 用户信息已添加到数据库); else throw new Exception(× 写入用户信息到数据库失败); } [EventHandler(Order 1, IsCancel true)] public void CancelInsertDb(MyEvent event) { Console.WriteLine(注册失败刷新验证码); } [EventHandler(Order 2)] public void InitUser(MyEvent event) { var state new Random().Next(0, 2); if (state 0) Console.WriteLine(√ 初始化用户数据系统生成默认用户权限、数据); else throw new Exception(× 初始化用户数据失败); } [EventHandler(Order 2, IsCancel true)] public void CancelInitUser(MyEvent event) { Console.WriteLine(撤销用户注册信息); } [EventHandler(Order 3)] public void SendEmail(MyEvent event) { var state new Random().Next(0, 2); if (state 0) Console.WriteLine(√ 发送验证邮件成功); else throw new Exception(× 发送验证邮件失败); } [EventHandler(Order 3, IsCancel true)] public void CancelSendEmail(MyEvent event) { Console.WriteLine(× 撤销初始化用户数据); } }可以看到每个执行器方法都有其执行顺序而且都配有一个撤销器只需要标记IsCancel true即可执行器和撤销器也可以可以分开放到不同的类型中。如果执行到SendEmail时出现错误框架会逐个执行撤销器执行顺序为CancelSendEmail、CancelInitUser、CancelInsertDb。我们还可以设置中间件拦截事件记录事件日志以及判断是否可以执行该事件。public class LoggingMiddlewareTEvent : IEventMiddlewareTEvent where TEvent : IEvent { public async Task HandleAsync(TEvent event, EventHandlerDelegate next) { Console.WriteLine(----- Handling command {0} ({1}), event.GetType().Name, event.ToString()); await next(); } }可以通过中间件拦截器判断是否需要执行事件如果继续执行事件则使用awat next();。你也可以使用数据库事务包裹await next();使得该事件的所有数据库操作都能在数据库回滚减少手动编写撤销的代码量以及增加可靠性防止代码撤销过程中出现异常、程序崩溃导致的撤销失败。使用事件总线是非常简单的只需要注册.AddEventBus()服务即可然后使用 IEventBus 服务发布事件。static async Task Main() { var ioc new ServiceCollection(); ioc.AddEventBus(middleware: typeof(LoggingMiddleware)); ioc.AddLogging(build build.AddConsole()); var services ioc.BuildServiceProvider(); var eventBus services.GetRequiredServiceIEventBus(); await eventBus.PublishAsync(new MyEvent() { Name 工良, EMail 工良maomi.com }); }事件总线的设计Maomi.EventBus 项目文件组成如下// 事件模型接口 IEvent.cs // 事件模型抽象实现 Event.cs // 事件总线接口 IEventBus.cs // 事件总线实现 EventBus.cs // 扩展函数 EventBusExtensions.cs // 特性标记 EventHandlerAttribute.cs // 记录事件执行器信息 EventInfo.cs // 事件中间件 IEventMiddleware.cs // 表达式树构造器 InvokeBuilder.cs接口抽象首先是最简单的事件模型每个事件都必须继承该接口以便在日志中记录可以在调试和排查问题时快速定位。// 事件接口通过事件传递参数 public interface IEvent { // 事件唯一标识 Guid GetEventId(); void SetEventId(Guid eventId); DateTime GetCreationTime(); void SetCreationTime(DateTime creationTime); } // 简化事件的实现通过事件传递参数 public abstract record Event : IEvent { private Guid _eventId; private DateTime _creationTime; protected Event() : this(Guid.NewGuid(), DateTime.UtcNow) { } protected Event(Guid eventId, DateTime creationTime) { _eventId eventId; _creationTime creationTime; } public Guid GetEventId() _eventId; public void SetEventId(Guid eventId) _eventId eventId; public DateTime GetCreationTime() _creationTime; public void SetCreationTime(DateTime creationTime) _creationTime creationTime; }通过事件模型定义了事件的基本格式使用者可以通过继承 IEvent 或 Event 来扩展事件模型。接着是两个执行器模型分别标记类型和方法EventAttribute 的作用仅仅是告诉程序这个类里面有事件执行器。而 EventHandlerAttribute 标记了这个方法是事件执行器需要绑定到哪个事件以及其执行顺序。// 标识类中有事件执行器 [AttributeUsage(AttributeTargets.Class, AllowMultiple false, Inherited false)] public class EventAttribute : Attribute { } // 标识方法是一个事件执行器 [AttributeUsage(AttributeTargets.Method, AllowMultiple false, Inherited false)] public class EventHandlerAttribute : Attribute { // 事件排序 public int Order { get; set; } 0; // 是否为撤销事件 public bool IsCancel { get; set; } false; }执行器封装由于我们采用非接口继承的方法实现事件处理执行器方法没有固定的名称和参数格式比较灵活所以在框架需要解决如何识别以及调用该方法。执行器方法会有以下四种情况// 同步方法 void My(MyEvent data) // 异步方法 Task My(MyEvent data) // 同步可取消 void My(MyEvent data, CancellationToken cancellationToken) // 异步可取消 Task My(MyEvent data, CancellationToken cancellationToken)由于执行器方法有四种情况当程序启动时框架如何扫描识别并注册执行器当触发事件之后框架应该如何调用执行器首先我们可以将这四种情况分为同步方法和异步方法然后使用两种委托包装使用object target表示事件类型使用params object?[] parameters来表示参数。// 定义函数格式支持异步和非异步的执行器 internal delegate Task TaskInvokeDelegate(object target, params object?[] parameters); internal delegate void VoidInvokeDelegate(object target, object?[] parameters);两个void My()函数会被表达式树封装成void VoidInvokeDelegate()委托而两个Task My()会被封装为Task TaskInvokeDelegate委托虽然定义了两个委托但是为了统一格式最后还需要将 VoidInvokeDelegate 委托转换为 TaskInvokeDelegate所以最后框架只需要统一封装按照Task TaskInvokeDelegate()委托执行异步方法即可。创建一个InvokeBuilder.cs类型封装方法通过表达式树将四种执行器方法以及 VoidInvokeDelegate 委托转换为 TaskInvokeDelegate 委托。比如以下方法public class CheckImageCodeEventHandler { public void Check(MyEvent event) { } }使用表达式树构造为如下伪代码所示VoidInvokeDelegate invoke (object target,object[]? parameters) { (CheckImageCodeEventHandler)target.Check(parameters); } TaskInvokeDelegate taskInvoke Task (object target,object[]? parameters) { invoke.Invoke(target,paramters); return Task.CompletedTask; }InvokeBuilder 完整代码如下所示internal static class InvokeBuilder { // 构造委托 public static TaskInvokeDelegate Build(MethodInfo methodInfo, Type targetType) { // 步骤一构造 (object target, params object?[] parameters) 参数 // 构造执行器方法的参数 var targetParameter Expression.Parameter(typeof(object), target); var parametersParameter Expression.Parameter(typeof(object?[]), parameters); // 构建函数参数列表 var parameters new ListExpression(); var paramInfos methodInfo.GetParameters(); for (var i 0; i paramInfos.Length; i) { var paramInfo paramInfos[i]; var valueObj Expression.ArrayIndex(parametersParameter, Expression.Constant(i)); var valueCast Expression.Convert(valueObj, paramInfo.ParameterType); parameters.Add(valueCast); } // 构造函数调用 var instanceCast Expression.Convert(targetParameter, targetType); var methodCall Expression.Call(instanceCast, methodInfo, parameters); // 步骤二转换为 TaskInvokeDelegate 形式 if (methodCall.Type typeof(void)) { var lambda Expression.LambdaVoidInvokeDelegate(methodCall, targetParameter, parametersParameter); var voidExecutor lambda.Compile(); return delegate (object target, object?[] parameters) { voidExecutor(target, parameters); return Task.CompletedTask; }; } else if (methodCall.Type typeof(Task)) { var castMethodCall Expression.Convert(methodCall, typeof(Task)); var lambda Expression.LambdaTaskInvokeDelegate(castMethodCall, targetParameter, parametersParameter); return lambda.Compile(); } else { throw new NotSupportedException($The return type of the [{methodInfo.Name}] method must be Task or void); } } }封装调用链将执行器方法封装成统一的委托结构后接下来是将这些执行器和取消执行器的委托使用表达式树构造成调用链按照 Order 的顺序执行。示例MyEvent InsertDb InitUser SendEmail CancelInsertDb CancelInitUser CancelSendEmail定义两个用于执行链委托// 定义事件委托用于构建执行链 public delegate Task EventHandlerDelegate(); // 带依赖注入的事件委托用于构建执行链 internal delegate Task ServiceEventHandlerDelegate(IServiceProvider provider, params object?[] parameters);ServiceEventHandlerDelegate 用于构造调用链和撤销链把执行事件的执行器执行顺序和撤销顺序包装起来。伪代码示例如下ServiceEventHandlerDelegate next async (provider, params) { try { A B C } catch { C B A } });为了能够拦截事件还需要定义一个事件中间件拦截器接口// 事件执行中间件即执行事件时的拦截器 public interface IEventMiddlewarein TEvent where TEvent : IEvent { // event: 事件 // next: 下一个要执行的函数 Task HandleAsync(TEvent event, EventHandlerDelegate next); }使用 ServiceEventHandlerDelegate 包装好之后我们只需要执行 ServiceEventHandlerDelegate 委托即可调用整个链路过程。而 EventHandlerDelegate 的作用是包装 ServiceEventHandlerDelegate 如果开发者要求使用中间件拦截事件那么我们需要再包装一层传递给中间件拦截器。if (Middleware ! null) { var mid ioc.GetRequiredServiceIEventMiddlewareTEvent(); EventHandlerDelegate next async () { await next(ioc, event, cancellationToken); }; await mid.HandleAsync(event, next); } else { await next(ioc, event, cancellationToken); }事件扫描和注册接着我们开始编写代码来处理扫描和注册保存事件。扫描每一个执行器方法存储到内存中并按照 Order 属性进行排序同时绑定 Cancel 撤销方法。为了绑定一个执行器信息和撤销器需要定义模型// 用来记录一个 Handler internal class EventInfo { // 执行器所在的类 public Type DeclaringType { get; set; } // 执行序号 public int Order { get; set; } // 事件 public Type EventType { get; set; } // 执行器方法 public MethodInfo MethodInfo { get; set; } // 委托封装的执行器方法 public TaskInvokeDelegate TaskInvoke { get; set; } // 撤销时执行 public bool IsCancel { get; set; } // 撤销执行器对应的信息 public EventInfo? CancelInfo { get; set; } public override int GetHashCode() { return MethodInfo.GetHashCode(); } public override bool Equals(object? obj) { if (obj is not EventInfo info) return false; return this.GetHashCode() info.GetHashCode(); } }接下来是实现事件总线定义事件总线接口// 事件总线服务 public interface IEventBus { Task PublishAsyncTEvent(TEvent event, CancellationToken cancellationToken default) where TEvent : IEvent; }为了方便笔者将 EventBus 类型拆开两部分来写使用分部类拆分代码。第一部分的 EventBus 中包含了静态方法和字段缓存执行器信息以及存储委托调用链。public partial class EventBus { #region static // 拦截器 private static Type? Middleware; // 缓存所有事件执行器 private static readonly DictionaryType, HashSetEventInfo EventCache new(); // 调用链缓存 private static readonly DictionaryType, ServiceEventHandlerDelegate HandlerDelegateCache new(); // 设置拦截器 public static void SetMiddleware(Type type) { Middleware type; } // 给一个事件添加执行器 public static void AddEventHandler( Type declaringType, // 执行器方法所在的类 int order, Type eventType, // 绑定了哪个事件 MethodInfo method) // 执行器方法 { if (!EventCache.TryGetValue(eventType, out var events)) { events new HashSetEventInfo(); EventCache[eventType] events; } var info new EventInfo { DeclaringType declaringType, EventType eventType, MethodInfo method, IsCancel false, Order order, // 封装方法为统一的格式 TaskInvoke InvokeBuilder.Build(method, declaringType) }; events.Add(info); // 绑定对应的撤销器 var cancelInfo events.FirstOrDefault(x x.EventType eventType x.Order order x.IsCancel true); if (cancelInfo ! null) info.CancelInfo cancelInfo; } // 添加撤销事件执行器 public static void AddCancelEventHandler(Type declaringType, int order, Type eventType, MethodInfo method) { if (!EventCache.TryGetValue(eventType, out var events)) { events new HashSetEventInfo(); EventCache[eventType] events; } var cancelInfo new EventInfo { DeclaringType declaringType, EventType eventType, MethodInfo method, IsCancel true, Order order, TaskInvoke InvokeBuilder.Build(method, declaringType) }; events.Add(cancelInfo); // 该撤销器绑定对应的执行器 var info events.FirstOrDefault(x x.EventType eventType x.Order order x.IsCancel false); if (info ! null) info.CancelInfo cancelInfo; } #endregion }这里面的两个方法很简单将执行器方法存储到缓存中。接着为了构建函数调用链已经执行失败回撤过程需要实现一个核心的构建方法将所有步骤封装到 ServiceEventHandlerDelegate 中。// 构建事件执行链 private static ServiceEventHandlerDelegate BuildHandlerTEvent() where TEvent : IEvent { if (HandlerDelegateCache.TryGetValue(typeof(TEvent), out var handler)) return handler; ServiceEventHandlerDelegate next async (provider, params) { var eventData params.OfTypeEvent().FirstOrDefault(); var cancel params.OfTypeCancellationToken().FirstOrDefault(); var logger provider.GetRequiredServiceILoggerEventBus(); logger.LogDebug(开始执行事件: {0},{1}, typeof(TEvent).Name, params[0]); if (!EventCache.TryGetValue(typeof(TEvent), out var eventInfos)) return; var infos eventInfos.Where(x x.IsCancel false).OrderBy(x x.Order).ToArray(); // 包装调用链和撤销链 for (int i 0; i infos.Length; i) { var info infos[i]; if (cancel.IsCancellationRequested) { logger.LogDebug(事件已被取消执行: {0},位置{1}, typeof(TEvent).Name, info.MethodInfo.Name); return; } logger.LogDebug(事件: {0}, {1}, typeof(TEvent).Name, info.MethodInfo.Name); // 构建执行链 var currentService provider.GetRequiredService(info.DeclaringType); try { await info.TaskInvoke(currentService, params); } // 执行失败开始回退 catch (Exception ex) { logger.LogError(ex, 执行事件失败: {0},执行器:{1},{2}, typeof(TEvent).Name, info.MethodInfo.Name, params[0]); for (int j i; j 0; j--) { var backInfo infos[j]; if (backInfo.CancelInfo is not null) { await backInfo.CancelInfo.TaskInvoke(currentService, params); } } return; } } }; // 存到缓存 HandlerDelegateCache[typeof(TEvent)] next; return next; }接着编写 EventBus 的实例方法实现 IEventBus 发布事件的方法。// 事件总线 public partial class EventBus : IEventBus { private readonly IServiceProvider _provider; public EventBus(IServiceProvider serviceProvider) { _provider serviceProvider; } // 发布事件 public async Task PublishAsyncTEvent(TEvent event, CancellationToken cancellationToken default) where TEvent : IEvent { var handler BuildHandlerTEvent(); if (Middleware ! null) { var mid _provider.GetRequiredServiceIEventMiddlewareTEvent(); EventHandlerDelegate next async () { await handler(_provider, event, cancellationToken); }; await mid.HandleAsync(event, next); } else { await handler(_provider, event, cancellationToken); } } }最后为了在程序启动时扫描出事件执行器注册到事件总线中需要实现服务注册的扩展方法public static class EventBusExtensions { // 添加事件总线扩展 public static void AddEventBus(this IServiceCollection services, Type? middleware null) { services.AddScopedIEventBus, EventBus(); if (middleware is not null) { EventBus.SetMiddleware(middleware); services.TryAddEnumerable(new ServiceDescriptor(typeof(IEventMiddleware), middleware, lifetime: ServiceLifetime.Transient)); } var assemblies AppDomain.CurrentDomain.GetAssemblies(); foreach (var assembly in assemblies) { foreach (var type in assembly.GetTypes()) { if (type.CustomAttributes.Any(x x.AttributeType typeof(EventAttribute))) { GetEventHandler(services, type); } } } } // 扫描类中的执行器 [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void GetEventHandler(IServiceCollection services, Type type) { services.AddScoped(type); var methods type.GetMethods(BindingFlags.Public | BindingFlags.Instance); foreach (var method in methods) { var attr method.GetCustomAttributeEventHandlerAttribute(); if (attr null) return; var parameters method.GetParameters(); if (parameters.Length 0) throw new Exception(${method.Name} 的定义不正确至少包含一个参数); var eventType parameters[0].ParameterType; if (!(eventType.IsSubclassOf(typeof(Event)) || eventType.GetInterface(typeof(IEvent).Name) ! null)) throw new Exception(${method.Name} 的定义不正确第一个参数必须为事件); if (!attr.IsCancel) EventBus.AddEventHandler(type, attr.Order, eventType, method); else EventBus.AddCancelEventHandler(type, attr.Order, eventType, method); } } }使用事务处理事件本节示例代码参考 Demo7.Tran 项目该项目使用 EFCore 将数据存储到 Sqlite 数据库中。public class MyContext : DbContext { public DbSetAccountEntity Account { get; set; } public MyContext(DbContextOptionsMyContext dbContext) : base(dbContext) { Database.EnsureCreated(); } protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { optionsBuilder.UseSqlite( filenamemy.db); } protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.EntityAccountEntity() .Property(b b.Id) .ValueGeneratedOnAdd(); } } [PrimaryKey(nameof(Id))] [Index(nameof(EMail), IsUnique true)] public class AccountEntity { public int Id { get; set; } public string Name { get; set; } public string EMail { get; set; } public bool VerifyEMail { get; set; } }由于一个事件会有多个执行器方法虽然事件执行失败后可以执行对应的方法回滚可是当准备回滚时程序崩溃了那么就会导致数据不一致为了解决这一问题可以在事件中间件中加入数据库事务处理。定义一个事件中间件在中间件中打开和回滚事务。public class TranMiddlewareTEvent : IEventMiddlewareTEvent where TEvent : IEvent { private readonly MyContext _context; public TranMiddleware(MyContext context) { _context context; } public async Task HandleAsync(TEvent event, EventHandlerDelegate next) { Console.WriteLine(----- Handling command {0} ({1}), event.GetType().Name, event.ToString()); using var tran await _context.Database.BeginTransactionAsync(); try { await next(); await tran.CommitAsync(); } catch (Exception) { await tran.RollbackAsync(); throw; } } }编写相关事件将数据插入到数据库中[Event] public class UserRegisterEventHandler { private readonly MyContext _context; public UserRegisterEventHandler(MyContext context) { _context context; } [EventHandler(Order 0)] public async Task InsertDb(MyEvent event) { var state new Random().Next(0, 2); if (state 1) { await _context.Account.AddAsync(new AccountEntity { Name event.Name, EMail event.EMail, }); await _context.SaveChangesAsync(); Console.WriteLine(√ 用户信息已添加到数据库); } else throw new Exception(× 写入用户信息到数据库失败); } }在进行依赖注入时注入该中间件static async Task Main(string[] args) { var ioc new ServiceCollection(); ioc.AddDbContextMyContext(); ioc.AddLogging(build build.AddConsole()); ioc.AddEventBus(typeof(TranMiddleware)); var services ioc.BuildServiceProvider(); var eventBus services.GetRequiredServiceIEventBus(); await eventBus.PublishAsync(new MyEvent() { Name 工良, EMail 工良maomi.com }); }

相关新闻