服务热线
135-6963-3175
DefaultMQAdminExt类
用于admin端topic、consumer等的管理
SessionCredentials sessionCredentials = new SessionCredentials(accessKey, secretKey); RPCHook rpcHook = new AclClientRPCHook(sessionCredentials);//用于带密钥管理端的创建 DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook, 5000L);
方法:
examineBrokerClusterInfo();//获取mq集群信息 createAndUpdateSubscriptionGroupConfig(brokerAddr,subscriptionGroupConfig);//创建或更新消费组订阅 deleteSubscriptionGroup(brokerAddr, consumerGroupName);//删除订阅 createTopic(key, topicName, queueNum)//用于topic的创建,可指定队列数量 deleteTopicInBroker(addr, topicName)//用于某broker地址topic的删除 createAndUpdateTopicConfig(addr, topicConfig)//指定broker地址topic的创建 ConsumeStats examineConsumeStats(group)//获取消费状态 consumeStats.getOffsetTable()//可获取队列偏移表 createAndUpdatePlainAccessConfig(addr, plainAccessConfig)//创建或更新acl配置文件,addr为nameserv地址 deletePlainAccessConfig(addr, accessKey)//删除acl配置,addr为nameserv地址
BrokerData
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); Map<String,BrokerData> brokerDataMap = clusterInfo.getBrokerAddrTable(); /* brokerDataMap:<brokerName:BrokerData> localhost:BrokerData [brokerName=localhost, brokerAddrs={0=172.16.7.1:10911}] localhost:TopicConfig [topicName=localhost, readQueueNums=1, writeQueueNums=1, perm=RWX, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false] */ 同一个Broker下可以分为master和slave,一个master,多个slave brokerId为0代表master 参照MixAll.MASTER_ID,其他brokerId为slave 同一个brokerName下可以有一个Master和多个Slave,所以brokerAddrs是一个集合 BrokerData brokerData=brokerDataMap.get("localhost"); //brokerId:broker Address Map<Long, String> brokerAddrs= brokerData.getBrokerAddrs();
TopicConfig
Map<Long, String> brokerAddrs= brokerData.getBrokerAddrs(); for(Long brokerId:brokerAddrs.keySet()){ //172.16.7.1:10911 String addr = brokerAddrs.get(brokerId); //该节点上所有topic配置 TopicConfigSerializeWrapper configSerializeWrapper = defaultMQAdminExt. getAllTopicGroup(addr, 1000L); Map<String, TopicConfig> topicConfigs = configSerializeWrapper.getTopicConfigTable(); for(String topicName:topicConfigs.keySet()){ System.out.println(topicName+":"+topicConfigs.get(topicName)); } } boot-test:TopicConfig [topicName=boot-test, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false] tc_topic:TopicConfig [topicName=tc_topic, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false] %RETRY%fastboot:TopicConfig [topicName=%RETRY%fastboot, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
PlanAccessConfig类
权限信息类
PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); plainAccessConfig.setAccessKey(aclAccountModel.getAccessKey()); plainAccessConfig.setSecretKey(aclAccountModel.getSecretKey()); plainAccessConfig.setAdmin(false);//非管理 plainAccessConfig.setDefaultGroupPerm("SUB");//订阅权限 plainAccessConfig.setDefaultTopicPerm("DENY");//无权限 List<String> topicPerms = new ArrayList(); topicPerms.add("RMQ_SYS_TRACE_TOPIC=PUB|SUB");//对该topic的订阅和发送权限 plainAccessConfig.setTopicPerms(topicPerms); 通过mqAdminExt的方法去创建或删除: createAndUpdatePlainAccessConfig(addr, plainAccessConfig)//创建或更新acl配置文件,addr为nameserv地址 deletePlainAccessConfig(addr, accessKey)//删除acl配置,addr为nameserv地址
DefaultMQProducer类
new DefaultMQProducer("mq-producer",hook,boolean enableMsgTrace, String customizedTraceTopic); //acl生产端,启用生产轨迹,轨迹发送topic名称 producer = new DefaultMQProducer("mq-producer"); producer.setMaxMessageSize(maxMessageSize);//消息最大大小 producer.setNamesrvAddr(namesrvAddr);//nameSrv地址,多个用;隔开 producer.setInstanceName(applicationName);//实例名称 producer.setCompressMsgBodyOverHowmuch(compressMsgBodyOverHowmuch);//压缩大小 producer.start(); SendResult sendResult = producer.send(msg,selectMessageQueueByHash,key);//消息的发送,按hash队列 producer.send(msg,sendCallback,timeout)//消息发送可传回调 MessageQueueSelector//该接口用于定义发送目标队列策略选择哪个queue进行发送,有以下实现: SelectMessageQueueByRandom SelectMessageQueueByHash
DefaultMQPushConsumer类
consumer=new DefaultMQPushConsumer(groupName,rpcHook, new AllocateMessageQueueAveragely(),true, MixAll.RMQ_SYS_TRACE_TOPIC); //true启用消息跟踪,消息轨迹topic名称 consumer.setNamesrvAddr(namesrvAddr); consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式 consumer.setConsumeThreadMax(consumerCount); consumer.setConsumeThreadMin(minThread); consumer.setPullBatchSize(pullBatchSize); consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); consumer.setInstanceName(pid+"_"+instanceName); consumer.subscribe(topic, tags)//订阅topic->tags consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { return ConsumeOrderlyStatus.SUCCESS;//消费成功 //return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//消费重试 });//顺序消费(局部,若需要全局顺序则需要创建topic时候queue为1) consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {});//并发消费