跳到主要内容
版本:3.0.0

事件总线(Aegis.EventBus)

Aegis.EventBus 是 Aegis 的消息生产消费基础组件。它负责注册 IMessageSender、消息容器、消费者扫描和后台消费处理器,让应用可以通过 Message 模型做异步解耦。

组件概览

字段说明
组件名称事件总线
真实类库Aegis.EventBus
组件定位基础消息生产消费组件
引入方式安装 NuGet,并在 Component.deps.jsonServices 中启用 EventBus
组件声明EventBus
核心能力IMessageSenderIMessageConsumer<Message>、后台消费处理器、默认消息容器
典型配套Aegis.EventBus.Container.Redis

什么时候要用它

适合场景:

  • 一个业务动作完成后,需要异步触发后续处理
  • 不希望主链路直接等待日志、同步、通知等后置逻辑
  • 同一个消息名需要触发多个消费者

先记住当前支持边界

当前公开文档默认只讲 Message 这条已经可用的生产消费模型。

模型当前建议
Message当前主线用法,推荐使用
Request当前不作为对外主线文档展开
Event当前不作为对外主线文档展开

如果你在做业务接入,直接按 Message 模型理解就够了。

最小可运行路径

第一步:启用 EventBus

{
"Components": {
"Services": [
"EventBus"
],
"Middlewares": []
}
}

第二步:定义消息发送代码

public class UserService : IUserContract
{
private readonly IMessageSender _messageSender;

public UserService(IMessageSender messageSender)
{
_messageSender = messageSender;
}

public async Task SendUserMessageAsync(UserDto dto)
{
await _messageSender.Send(new Message
{
MessageName = "UserChanged",
Data = dto,
CreatedTime = DateTime.Now
});
}
}

第三步:定义消费者

public class UserChangedConsumer : IMessageConsumer<Message>
{
public string MessageName { get; } = "UserChanged";

public async Task<bool> ExecuteAsync(Message message)
{
var user = message.Data as UserDto;

Console.WriteLine($"{user?.Name}:{user?.Age}");
await Task.CompletedTask;
return true;
}
}

Message 模型长什么样

public class Message
{
public string MessageName { get; set; }
public object Data { get; set; }
public DateTime CreatedTime { get; set; }
}

三个字段里最关键的是:

  • MessageName:决定由谁消费
  • Data:消息正文
  • CreatedTime:消息创建时间

消费者是怎么被注册的

启用 EventBus 后,框架会扫描当前运行中的程序集,把实现了 IMessageConsumer<Message> 的具体类自动注册进去。

这意味着你通常只需要:

  • 写好消费者类
  • 确保对应程序集已经被应用引用

同一个消息名可以有多个消费者

当前实现里,同一个 MessageName 可以对应多个消费者,处理器会按收集到的顺序逐个执行。

public class HealthConsumer : IMessageConsumer<Message>
{
public string MessageName { get; } = "Health";

public Task<bool> ExecuteAsync(Message message)
{
Console.WriteLine("consumer-1");
return Task.FromResult(true);
}
}

public class Health2Consumer : IMessageConsumer<Message>
{
public string MessageName { get; } = "Health";

public Task<bool> ExecuteAsync(Message message)
{
Console.WriteLine("consumer-2");
return Task.FromResult(true);
}
}

适合场景:

  • 一个消息既要记业务日志,也要同步其他状态
  • 一个消息需要拆给多个后置处理器

默认容器是什么

默认情况下,EventBus 使用进程内消息容器。

这条路线的特点是:

  • 接入简单
  • 适合同一应用内部的异步解耦
  • 不适合跨实例分发

如果你需要把消息容器切到 Redis,请继续接入 Redis 事件总线容器扩展

边界与限制

消费者生命周期

消费者默认按单例方式注册。
所以更推荐在消费者里依赖线程安全的服务,或者只通过 DI 获取可安全使用的依赖。

消息体类型转换

Message.Dataobject,消费端一般需要自己转成业务对象。

var user = message.Data as UserDto;

如果发送端和消费端约定不一致,运行时就会出现取值失败或空对象。

没有对应消费者会怎么样

如果消息名没有匹配到任何消费者,消息不会进入实际业务处理逻辑。
所以消息名最好统一收口成常量。

接入后怎么确认已经生效

可以这样确认:

  • 启动应用后没有 EventBus 初始化异常
  • 发送端可以正常注入 IMessageSender
  • 发送消息后,对应消费者的 ExecuteAsync 被触发

常见问题

为什么发送了消息但没有消费

优先检查:

  • Component.deps.json 里是否启用了 EventBus
  • 消费者是否实现了 IMessageConsumer<Message>
  • MessageName 是否完全一致
  • 消费者所在程序集是否已经被应用引用

一个消息能不能被多个消费者消费

可以。
只要多个消费者的 MessageName 一样,它们都会执行。

消费者里能不能访问数据库或其他服务

可以,但要注意消费者是单例。
如果消费者要做复杂业务,建议把具体逻辑继续下沉到服务类里。

配套阅读