跳到主要内容
版本:3.0.0

RocketMQ 客户端(Aegis.RocketMQ.Client)

Aegis.RocketMQ.Client 提供 RocketMQ 的底层客户端接入能力,包括连接管理、生产者和消费者。它和 事件总线(Aegis.EventBus) 不是同一层:前者是直接 MQ 客户端,后者是统一消息抽象。

组件概览

字段说明
组件名称RocketMQ 客户端
真实类库Aegis.RocketMQ.Client
组件定位RocketMQ 生产、消费与连接管理组件
引入方式安装 NuGet 后,通过代码注册 AddRocketMqClient(...)
组件声明无固定 Component.deps.json 组件声明
核心能力发送普通消息、延迟消息、顺序消息,订阅并消费指定 Topic / Tag
主要配置RocketMqOptions
典型配套事件总线

什么时候要用它

适合场景:

  • 你要直接对接 RocketMQ Topic
  • 你要手动控制发送、订阅和消费逻辑
  • 你当前不想先走事件总线抽象,而是直接使用 MQ 客户端

最小接入方式

第一步:安装组件

dotnet add package Aegis.RocketMQ.Client

第二步:在服务注册阶段启用客户端

RocketMQ 当前不是 Component.deps.json 自动装配组件,需要在代码里显式注册。

builder.Services.AddRocketMqClient(options =>
{
options.NameServerAddress = "127.0.0.1:9876";
options.ProducerGroup = "aegis-producer";
options.ConsumerGroup = "aegis-consumer";
options.Topic = "aegis-demo";
options.MaxMessageSize = 4 * 1024 * 1024;
options.SendMsgTimeout = 3000;
options.RetryTimesWhenSendFailed = 2;
options.RetryTimesWhenSendAsyncFailed = 2;
options.EnableMsgTrace = true;
options.AccessKey = "";
options.SecretKey = "";
});

配置项怎么理解

配置项作用
NameServerAddressRocketMQ NameServer 地址
ProducerGroup生产者组名
ConsumerGroup消费者组名
Topic默认业务 Topic,可作为项目约定值
MaxMessageSize单条消息大小上限
SendMsgTimeout发送超时时间
RetryTimesWhenSendFailed同步发送失败重试次数
RetryTimesWhenSendAsyncFailed异步发送失败重试次数
EnableMsgTrace是否启用消息轨迹
AccessKey / SecretKey鉴权配置

生产者怎么用

发送普通消息

public class OrderMessageService
{
private readonly IRocketMqProducer _producer;

public OrderMessageService(IRocketMqProducer producer)
{
_producer = producer;
}

public Task SendCreatedAsync(OrderCreatedMessage message)
{
return _producer.SendMessageAsync("order-created", message);
}
}

发送延迟消息

await _producer.SendDelayMessageAsync(
"order-timeout-check",
new { OrderId = orderId },
delay: 60);

发送顺序消息

await _producer.SendOrderlyMessageAsync(
"order-events",
new { OrderId = orderId, Step = "Paid" },
messageGroup: orderId.ToString());

消费者怎么用

当前消费者使用方式是:先订阅,再启动。

public class OrderConsumerHostedService : IHostedService
{
private readonly IRocketMqConsumer _consumer;

public OrderConsumerHostedService(IRocketMqConsumer consumer)
{
_consumer = consumer;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
await _consumer.SubscribeAsync("order-created", "*", async body =>
{
Console.WriteLine(body);
await Task.CompletedTask;
});

await _consumer.StartAsync();
}

public Task StopAsync(CancellationToken cancellationToken)
{
return _consumer.StopAsync();
}
}

关键边界

要特别记住这几条:

  • 这是手动代码注册组件,不走 Component.deps.json
  • IRocketMqProducerIRocketMqConsumer 是直接面向 RocketMQ 的客户端接口
  • 如果你只是做应用内统一消息抽象,优先先看 事件总线

接入后怎么确认生效

通常用下面几项验收:

  • 应用启动时没有连接 NameServer 相关报错
  • 普通消息可以成功发到指定 Topic
  • 延迟消息和顺序消息都能按预期发送
  • 消费者订阅后能收到消息并触发回调

常见问题

为什么放进了 Component.deps.json 仍然没生效

因为当前组件不是自动装配组件,必须在代码里显式调用 AddRocketMqClient(...)

已经注册了客户端,但消费者一直没收到消息

优先检查:

  • NameServerAddress 是否正确
  • 订阅的 topic / tag 是否和发送端一致
  • ConsumerGroup 是否配置正确
  • 是否已经调用了 StartAsync()