EventBus
目前的事件总线支持的功能较少,目前暂时只支持生产消费模式的消息消费机制。
启用EventBus
对应的功能包含在Aegis.EventBus Nuget包中
在Component.deps.json
中的Services节点中确定包含了EventBus
。
模型契约 Contracts
事件总线将支持多种传递模型,包含 Request
、Message
、Event
,目前已完成的只有Message
。
对应模型解析
- Request : 1:1,请求调用模型
- Message : N:1,生产消费式模型,用于推送服务消息(如推送已完成之后用于记录的业务日志相关消息)
- Event :N:N, 发布订阅式模型,用于广播服务状态和服务事件
数据容器 Container
目前的数据容器暂时只支持基于内存的Channel
模型
Channel参考资料
需要注意的是基于内存的方案,只能自己生产/发布,自己消费/订阅,也就是说同一个程序内一个接口生产消息,自带的消费者消费消息,无法做到分布式消费(A机器生产消息,B机器消费消息)。
在之后的考虑容器中还会考虑Redis
、SqlServer
等容器来解决分布式消费的问题。
同时也会考虑如Kafka
、RabbitMQ
等Container/Processor方案。
生产消费模型(Message Bus)
主要使用Message
作为传输模型。
消息 Message
注意命名空间应该是Aegis.EventBus.Contracts
public class Message
{
///消息名
public string MessageName { get; set; }
//消息数据
public object Data { get;set; }
//创建时间,可以不传,默认是当前时间
public DateTime CreatedTime { get;set; }
}
生产者 Sender
对外提供的接口是IMessageSender
,目前提供的唯一可用运行Sender
是ChannelSender
/// <summary>
/// 消息发送者(提供者)
/// </summary>
public interface IMessageSender
{
Task Send(Message message);
}
使用方式
private readonly IMessageSender _messageSender;
public UserService(...,IMessageSender messageSender)
{
...
_messageSender = messageSender;
}
public async Task ChangeState()
{
... //业务代码
await _messageSender.Send(new Message()
{
MessageName = MessageConsts.Health, //这里指定消息的名字,对应的消费者获取到对应消息名的消息才开始消费,建议使用常量类
Data = new HealthEntity(){ Id = 1,Name = "张三",State = true},
CreatedTime = DateTime.Now
} );
... //业务代码
}
消费者 Consumer
消费者Consumer需要开发自行实现具体代码,只需要继承IMessageConsumer<Message>
,实现其中的ExecuteAsync
方法即可,同时给予MessageName具体的值。如下述代码。
MessageName最好使用常量类来做存储,这样在后续方便管理。
public class HealthConsumer : IMessageConsumer<Message>
{
public string MessageName { get; } = MessageConsts.Health; //在这里指定该消费者对应的消费消息名,在之后会移到Attribute里
public async Task<bool> ExecuteAsync(Message message)
{
await Task.Delay(100);
//... 具体消费代码
// message.Data就是生产者传入的数据对象
//对应上面的解析方式就是
var data = message.Data as HealthEntity;
Console.WriteLine(data.ToString());
return true;
}
}
Notice:消费者本身会被注册为Singleton生命周期。