事件总线(Aegis.EventBus)
Aegis.EventBus 是 Aegis 的消息生产消费基础组件。它负责注册 IMessageSender、消息容器、消费者扫描和后台消费处理器,让应用可以通过 Message 模型做异步解耦。
组件概览
| 字段 | 说明 |
|---|---|
| 组件名称 | 事件总线 |
| 真实类库 | Aegis.EventBus |
| 组件定位 | 基础消息生产消费组件 |
| 引入方式 | 安装 NuGet,并在 Component.deps.json 的 Services 中启用 EventBus |
| 组件声明 | EventBus |
| 核心能力 | IMessageSender、IMessageConsumer<Message>、后台消费处理器、默认消息容器 |
| 典型配套 | Aegis.EventBus.Container.Redis |
什么时候要用它
适合场景:
- 一个业务动作完成后,需要异步触发后续处理
- 不希望主链路直接等待日志、同步、通知等后置逻辑
- 同一个消息名需要触发多个消费者
先记住当前支持边界
当前公开文档默认只讲 Message 这条已经可用的生产消费模型。
| 模型 | 当前建议 |
|---|---|
Message | 当前主线用法,推荐使用 |
Request | 当前不作为对外主线文档展开 |
Event | 当前不作为对外主线文档展开 |
如果你在做业务接入,直接按 Message 模型理解就够了。
最小可运行路径
第一步:启用 EventBus
{
"Components": {
"Services": [
"EventBus"
],
"Middlewares": []
}
}
第二步:定义消息发送代码
public class UserService : IUserContract
{
private readonly IMessageSender _messageSender;
public UserService(IMessageSender messageSender)
{
_messageSender = messageSender;
}
public async Task SendUserMessageAsync(UserDto dto)
{
await _messageSender.Send(new Message
{
MessageName = "UserChanged",
Data = dto,
CreatedTime = DateTime.Now
});
}
}
第三步:定义消费者
public class UserChangedConsumer : IMessageConsumer<Message>
{
public string MessageName { get; } = "UserChanged";
public async Task<bool> ExecuteAsync(Message message)
{
var user = message.Data as UserDto;
Console.WriteLine($"{user?.Name}:{user?.Age}");
await Task.CompletedTask;
return true;
}
}
Message 模型长什么样
public class Message
{
public string MessageName { get; set; }
public object Data { get; set; }
public DateTime CreatedTime { get; set; }
}
三个字段里最关键的是:
MessageName:决定由谁消费Data:消息正文CreatedTime:消息创建时间
消费者是怎么被注册的
启用 EventBus 后,框架会扫描当前运行中的程序集,把实现了 IMessageConsumer<Message> 的具体类自动注册进去。
这意味着你通常只需要:
- 写好消费者类
- 确保对应程序集已经被应用引用
同一个消息名可以有多个消费者
当前实现里,同一个 MessageName 可以对应多个消费者,处理器会按收集到的顺序逐个执行。
public class HealthConsumer : IMessageConsumer<Message>
{
public string MessageName { get; } = "Health";
public Task<bool> ExecuteAsync(Message message)
{
Console.WriteLine("consumer-1");
return Task.FromResult(true);
}
}
public class Health2Consumer : IMessageConsumer<Message>
{
public string MessageName { get; } = "Health";
public Task<bool> ExecuteAsync(Message message)
{
Console.WriteLine("consumer-2");
return Task.FromResult(true);
}
}
适合场景:
- 一个消息既要记业务日志,也要同步其他状态
- 一个消息需要拆给多个后置处理器
默认容器是什么
默认情况下,EventBus 使用进程内消息容器。
这条路线的特点是:
- 接入简单
- 适合同一应用内部的异步解耦
- 不适合跨实例分发
如果你需要把消息容器切到 Redis,请继续接入 Redis 事件总线容器扩展。
边界与限制
消费者生命周期
消费者默认按单例方式注册。
所以更推荐在消费者里依赖线程安全的服务,或者只通过 DI 获取可安全使用的依赖。
消息体类型转换
Message.Data 是 object,消费端一般需要自己转成业务对象。
var user = message.Data as UserDto;
如果发送端和消费端约定不一致,运行时就会出现取值失败或空对象。
没有对应消费者会怎么样
如果消息名没有匹配到任何消费者,消息不会进入实际业务处理逻辑。
所以消息名最好统一收口成常量。
接入后怎么确认已经生效
可以这样确认:
- 启动应用后没有
EventBus初始化异常 - 发送端可以正常注入
IMessageSender - 发送消息后,对应消费者的
ExecuteAsync被触发
常见问题
为什么发送了消息但没有消费
优先检查:
Component.deps.json里是否启用了EventBus- 消费者是否实现了
IMessageConsumer<Message> MessageName是否完全一致- 消费者所在程序集是否已经被应用引用
一个消息能不能被多个消费者消费
可以。
只要多个消费者的 MessageName 一样,它们都会执行。
消费者里能不能访问数据库或其他服务
可以,但要注意消费者是单例。
如果消费者要做复杂业务,建议把具体逻辑继续下沉到服务类里。