RocketMQ 客户端(Aegis.RocketMQ.Client)
Aegis.RocketMQ.Client 提供 RocketMQ 的底层客户端接入能力,包括连接管理、生产者和消费者。它和 事件总线(Aegis.EventBus) 不是同一层:前者是直接 MQ 客户端,后者是统一消息抽象。
组件概览
| 字段 | 说明 |
|---|---|
| 组件名称 | RocketMQ 客户端 |
| 真实类库 | Aegis.RocketMQ.Client |
| 组件定位 | RocketMQ 生产、消费与连接管理组件 |
| 引入方式 | 安装 NuGet 后,通过代码注册 AddRocketMqClient(...) |
| 组件声明 | 无固定 Component.deps.json 组件声明 |
| 核心能力 | 发送普通消息、延迟消息、顺序消息,订阅并消费指定 Topic / Tag |
| 主要配置 | RocketMqOptions |
| 典型配套 | 事件总线 |
什么时候要用它
适合场景:
- 你要直接对接 RocketMQ Topic
- 你要手动控制发送、订阅和消费逻辑
- 你当前不想先走事件总线抽象,而是直接使用 MQ 客户端
最小接入方式
第一步:安装组件
dotnet add package Aegis.RocketMQ.Client
第二步:在服务注册阶段启用客户端
RocketMQ 当前不是 Component.deps.json 自动装配组件,需要在代码里显式注册。
builder.Services.AddRocketMqClient(options =>
{
options.NameServerAddress = "127.0.0.1:9876";
options.ProducerGroup = "aegis-producer";
options.ConsumerGroup = "aegis-consumer";
options.Topic = "aegis-demo";
options.MaxMessageSize = 4 * 1024 * 1024;
options.SendMsgTimeout = 3000;
options.RetryTimesWhenSendFailed = 2;
options.RetryTimesWhenSendAsyncFailed = 2;
options.EnableMsgTrace = true;
options.AccessKey = "";
options.SecretKey = "";
});
配置项怎么理解
| 配置项 | 作用 |
|---|---|
NameServerAddress | RocketMQ NameServer 地址 |
ProducerGroup | 生产者组名 |
ConsumerGroup | 消费者组名 |
Topic | 默认业务 Topic,可作为项目约定值 |
MaxMessageSize | 单条消息大小上限 |
SendMsgTimeout | 发送超时时间 |
RetryTimesWhenSendFailed | 同步发送失败重试次数 |
RetryTimesWhenSendAsyncFailed | 异步发送失败重试次数 |
EnableMsgTrace | 是否启用消息轨迹 |
AccessKey / SecretKey | 鉴权配置 |
生产者怎么用
发送普通消息
public class OrderMessageService
{
private readonly IRocketMqProducer _producer;
public OrderMessageService(IRocketMqProducer producer)
{
_producer = producer;
}
public Task SendCreatedAsync(OrderCreatedMessage message)
{
return _producer.SendMessageAsync("order-created", message);
}
}
发送延迟消息
await _producer.SendDelayMessageAsync(
"order-timeout-check",
new { OrderId = orderId },
delay: 60);
发送顺序消息
await _producer.SendOrderlyMessageAsync(
"order-events",
new { OrderId = orderId, Step = "Paid" },
messageGroup: orderId.ToString());
消费者怎么用
当前消费者使用方式是:先订阅,再启动。
public class OrderConsumerHostedService : IHostedService
{
private readonly IRocketMqConsumer _consumer;
public OrderConsumerHostedService(IRocketMqConsumer consumer)
{
_consumer = consumer;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _consumer.SubscribeAsync("order-created", "*", async body =>
{
Console.WriteLine(body);
await Task.CompletedTask;
});
await _consumer.StartAsync();
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _consumer.StopAsync();
}
}
关键边界
要特别记住这几条:
- 这是手动代码注册组件,不走
Component.deps.json IRocketMqProducer和IRocketMqConsumer是直接面向 RocketMQ 的客户端接口- 如果你只是做应用内统一消息抽象,优先先看 事件总线
接入后怎么确认生效
通常用下面几项验收:
- 应用启动时没有连接 NameServer 相关报错
- 普通消息可以成功发到指定 Topic
- 延迟消息和顺序消息都能按预期发送
- 消费者订阅后能收到消息并触发回调
常见问题
为什么放进了 Component.deps.json 仍然没生效
因为当前组件不是自动装配组件,必须在代码里显式调用 AddRocketMqClient(...)。
已经注册了客户端,但消费者一直没收到消息
优先检查:
NameServerAddress是否正确- 订阅的
topic/tag是否和发送端一致 ConsumerGroup是否配置正确- 是否已经调用了
StartAsync()