收到订阅关系不一致告警后怎么办?¶
问题描述¶
Group ID 订阅的部分消息未收到。登录消息队列 RocketMQ 控制台, 单击消息查询 > 按 Message ID 查询消息,消息显示已至少被消费—次,但从消费逻辑判断未消费。
登录消息队列 RocketMQ 控制台,单击消费者管理 > 消费者状态,订阅关系是否一致栏显示否。
问题分析¶
消息队列 RocketMQ 里的一个 Group ID 代表一个 Consumer 实例群组。 对大多数分布式应用来说,一个 Group ID 下通常会挂载多个 Consumer 实例。 订阅关系一致指,同一个 Group ID 下所有的 Consumer 实例的 Topic 和 Tag 都必须完全一致。
若同一个 Group ID 下的 Consumer 实例配置了不同的 Topic, 或是 Topic 相同但 Tag 不同 , 订阅关系就会不一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,导致消息丢失。
【原因1】配置了相同 Group ID 的不同客户端订阅的 Topic 不一致。¶
错误示例
同一个 Group ID(GID-MQ-FAQ)订阅的 Topic 不同(分别是 MQ-FAQ-TOPIC-1、MQ-FAQ-TOPIC-2)。
JVM-1 上的代码
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("MQ-FAQ-TOPIC-1", "NM-MQ-FAQ", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
JVM-2 上的代码
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("MQ-FAQ-TOPIC-2", "NM-MQ-FAQ", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
【原因2】同一个 Group ID 下的不同客户端订阅的 Topic 相同,但 Tag 不同。¶
错误示例
同一个 Group ID(GID-MQ-FAQ)订阅的 Topic 不同(分别是 MQ-FAQ-TOPIC-1、MQ-FAQ-TOPIC-2)。
JVM-1 上的代码
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("MQ-FAQ-TOPIC-1", "NM-MQ-FAQ", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
JVM-2 上的代码
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("MQ-FAQ-TOPIC-2", "NM-MQ-FAQ", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
结果验证¶
消费者能接收到所有期望被接收的消息。
登录消息队列 RocketMQ 控制台,单击消费者管理 > 消费者状态,订阅关系是否一致栏显示是。