服务热线
135-6963-3175
rocketMq死信队列
死信队列消息的前提是基于consumer消费消息异常后经过多次重试队列的投递依赖消费失败最终会进入死信队列。
理解死信队列的核心在于理解死信队列对应Topic的配置,包括topicName、readQueueNums、writeQueueNums、perm。
死信队列对应Topic的权限为2,只有写权限,所以死信队列没有办法读取。
// 读权限为4 public static final int PERM_READ = 0x1 << 2; // 写权限为2 public static final int PERM_WRITE = 0x1 << 1; // 继承权限为1 public static final int PERM_INHERIT = 0x1 << 0;
死信队列Topic配置
{
"topicConfigTable":{
"%DLQ%quickstart_consumer_dlq":{
"order":false,
"perm":2,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"%DLQ%quickstart_consumer_dlq",
"topicSysFlag":0,
"writeQueueNums":1
}
}
}死信队列 %DLQ%quickstart_consumer_dlq 的读写队列的个数为1。
死信队列 %DLQ%quickstart_consumer_dlq 的读写队列的权限为2。
permission(2|4|6), intro[2:W; 4:R; 6:RW]
命令授权方式:(应该也可通过java acl授权方式,待测)
命令格式 usage: mqadmin updateTopicPerm [-b <arg>] [-c <arg>] [-h] [-n <arg>] -p <arg> -t <arg> -b,--brokerAddr <arg> create topic to which broker -c,--clusterName <arg> create topic to which cluster -h,--help Print help -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 -p,--perm <arg> set topic's permission(2|4|6), intro[2:W; 4:R; 6:RW] -t,--topic <arg> topic name 命令执行 ./mqadmin updateTopicPerm -c DefaultCluster -n localhost:9876 -p 6 -t %DLQ%quickstart_consumer_dlq 命令执行过程 RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. update topic perm from 2 to 6 in 192.168.0.10:10911 success.
通过updateTopicPerm的命令将死信队列的读写权限改为6,保证读写权限。
permission(2|4|6), intro[2:W; 4:R; 6:RW]
{
"topicConfigTable":{
"%RETRY%quickstart_consumer_dlq":{
"order":false,
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"%RETRY%quickstart_consumer_dlq",
"topicSysFlag":0,
"writeQueueNums":1
}
}
}死信队列 %DLQ%quickstart_consumer_dlq 更改权限后的读写队列的权限为6。
permission(2|4|6), intro[2:W; 4:R; 6:RW]
重投处理逻辑先将消息投递到重试Topic的%RETRY%consumerGroup队列,超过最大重试次数后将消息投递到死信Topic的%DLQ%consumerGroup队列。
进行死信的消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer_dlq");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅死信队列
consumer.subscribe("%DLQ%quickstart_consumer_dlq", "*");
consumer.setMaxReconsumeTimes(1);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");通过将死信队列的权限设置为读写权限,然后直接通过订阅对应的死信队列即可。
consumer.subscribe("%DLQ%quickstart_consumer_dlq", "*");