EventBus

目前的事件总线支持的功能较少,目前暂时只支持生产消费模式的消息消费机制。

启用EventBus

对应的功能包含在Aegis.EventBus Nuget包中

Component.deps.json中的Services节点中确定包含了EventBus

模型契约 Contracts

事件总线将支持多种传递模型,包含 RequestMessageEvent,目前已完成的只有Message

对应模型解析

  • Request : 1:1,请求调用模型
  • Message : N:1,生产消费式模型,用于推送服务消息(如推送已完成之后用于记录的业务日志相关消息)
  • Event :N:N, 发布订阅式模型,用于广播服务状态和服务事件

数据容器 Container

目前的数据容器暂时只支持基于内存的Channel模型 Channel参考资料

需要注意的是基于内存的方案,只能自己生产/发布,自己消费/订阅,也就是说同一个程序内一个接口生产消息,自带的消费者消费消息,无法做到分布式消费(A机器生产消息,B机器消费消息)。

在之后的考虑容器中还会考虑RedisSqlServer等容器来解决分布式消费的问题。

同时也会考虑如KafkaRabbitMQ等Container/Processor方案。

生产消费模型(Message Bus)

主要使用Message作为传输模型。

消息 Message

注意命名空间应该是Aegis.EventBus.Contracts

public class Message  
{  
    ///消息名
    public string MessageName { get; set; }  

    //消息数据
    public object Data { get;set; }  

    //创建时间,可以不传,默认是当前时间
    public DateTime CreatedTime { get;set; }  
}


生产者 Sender

对外提供的接口是IMessageSender,目前提供的唯一可用运行SenderChannelSender

/// <summary>  
/// 消息发送者(提供者)  
/// </summary>  
public interface IMessageSender  
{  
    Task Send(Message message);  
}

使用方式

private readonly IMessageSender _messageSender;

public UserService(...,IMessageSender messageSender)
{
    ...
    _messageSender = messageSender;
}


public async Task ChangeState()
{
    ... //业务代码
    await _messageSender.Send(new Message()  
    {  
        MessageName = MessageConsts.Health,   //这里指定消息的名字,对应的消费者获取到对应消息名的消息才开始消费,建议使用常量类
        Data = new HealthEntity(){ Id = 1,Name = "张三",State = true},  
        CreatedTime = DateTime.Now  
    } );
    ... //业务代码
}


消费者 Consumer

消费者Consumer需要开发自行实现具体代码,只需要继承IMessageConsumer<Message>,实现其中的ExecuteAsync方法即可,同时给予MessageName具体的值。如下述代码。

MessageName最好使用常量类来做存储,这样在后续方便管理。

public class HealthConsumer : IMessageConsumer<Message>  
{  
    public string MessageName { get; } = MessageConsts.Health;   //在这里指定该消费者对应的消费消息名,在之后会移到Attribute里

    public async Task<bool> ExecuteAsync(Message message)  
    {        
        await Task.Delay(100);  
        //... 具体消费代码
        // message.Data就是生产者传入的数据对象
        //对应上面的解析方式就是 
        var data = message.Data as HealthEntity;

        Console.WriteLine(data.ToString());  
        return true;  
    }
}

Notice:消费者本身会被注册为Singleton生命周期。

results matching ""

    No results matching ""