技术交流28群

服务热线

135-6963-3175

微信服务号

rocketmq java相关函数 更新时间 2019-5-22 浏览2140次

DefaultMQAdminExt类

用于admin端topic、consumer等的管理

SessionCredentials sessionCredentials = new SessionCredentials(accessKey, secretKey);
RPCHook rpcHook = new AclClientRPCHook(sessionCredentials);//用于带密钥管理端的创建
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook, 5000L);


方法:

examineBrokerClusterInfo();//获取mq集群信息
createAndUpdateSubscriptionGroupConfig(brokerAddr,subscriptionGroupConfig);//创建或更新消费组订阅
deleteSubscriptionGroup(brokerAddr, consumerGroupName);//删除订阅
createTopic(key, topicName, queueNum)//用于topic的创建,可指定队列数量
deleteTopicInBroker(addr, topicName)//用于某broker地址topic的删除
createAndUpdateTopicConfig(addr, topicConfig)//指定broker地址topic的创建
ConsumeStats examineConsumeStats(group)//获取消费状态
consumeStats.getOffsetTable()//可获取队列偏移表

createAndUpdatePlainAccessConfig(addr, plainAccessConfig)//创建或更新acl配置文件,addr为nameserv地址
deletePlainAccessConfig(addr, accessKey)//删除acl配置,addr为nameserv地址

BrokerData

ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
Map<String,BrokerData> brokerDataMap = clusterInfo.getBrokerAddrTable();
/*
   brokerDataMap:<brokerName:BrokerData>
  localhost:BrokerData [brokerName=localhost, brokerAddrs={0=172.16.7.1:10911}]
  localhost:TopicConfig [topicName=localhost, readQueueNums=1, writeQueueNums=1, perm=RWX,
   topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
*/

同一个Broker下可以分为master和slave,一个master,多个slave
brokerId为0代表master 参照MixAll.MASTER_ID,其他brokerId为slave
同一个brokerName下可以有一个Master和多个Slave,所以brokerAddrs是一个集合
BrokerData brokerData=brokerDataMap.get("localhost");
//brokerId:broker Address
Map<Long, String> brokerAddrs= brokerData.getBrokerAddrs();

TopicConfig

 Map<Long, String> brokerAddrs= brokerData.getBrokerAddrs();
 for(Long brokerId:brokerAddrs.keySet()){
    //172.16.7.1:10911
    String addr = brokerAddrs.get(brokerId);
    //该节点上所有topic配置
    TopicConfigSerializeWrapper configSerializeWrapper = defaultMQAdminExt.
    getAllTopicGroup(addr, 1000L);            
    Map<String, TopicConfig> topicConfigs = configSerializeWrapper.getTopicConfigTable();
    for(String topicName:topicConfigs.keySet()){
      System.out.println(topicName+":"+topicConfigs.get(topicName));
    }
 }
 boot-test:TopicConfig [topicName=boot-test, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
 tc_topic:TopicConfig [topicName=tc_topic, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
 %RETRY%fastboot:TopicConfig [topicName=%RETRY%fastboot, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]


PlanAccessConfig类

权限信息类

PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
plainAccessConfig.setAccessKey(aclAccountModel.getAccessKey());
plainAccessConfig.setSecretKey(aclAccountModel.getSecretKey());
plainAccessConfig.setAdmin(false);//非管理
plainAccessConfig.setDefaultGroupPerm("SUB");//订阅权限
plainAccessConfig.setDefaultTopicPerm("DENY");//无权限
List<String> topicPerms = new ArrayList();
topicPerms.add("RMQ_SYS_TRACE_TOPIC=PUB|SUB");//对该topic的订阅和发送权限
plainAccessConfig.setTopicPerms(topicPerms);
通过mqAdminExt的方法去创建或删除:
createAndUpdatePlainAccessConfig(addr, plainAccessConfig)//创建或更新acl配置文件,addr为nameserv地址
deletePlainAccessConfig(addr, accessKey)//删除acl配置,addr为nameserv地址



DefaultMQProducer类

new DefaultMQProducer("mq-producer",hook,boolean enableMsgTrace, String customizedTraceTopic);
//acl生产端,启用生产轨迹,轨迹发送topic名称
producer = new DefaultMQProducer("mq-producer");
producer.setMaxMessageSize(maxMessageSize);//消息最大大小
producer.setNamesrvAddr(namesrvAddr);//nameSrv地址,多个用;隔开
producer.setInstanceName(applicationName);//实例名称
producer.setCompressMsgBodyOverHowmuch(compressMsgBodyOverHowmuch);//压缩大小
producer.start();

SendResult sendResult = producer.send(msg,selectMessageQueueByHash,key);//消息的发送,按hash队列
producer.send(msg,sendCallback,timeout)//消息发送可传回调

MessageQueueSelector//该接口用于定义发送目标队列策略选择哪个queue进行发送,有以下实现:
    SelectMessageQueueByRandom
    SelectMessageQueueByHash


DefaultMQPushConsumer类

consumer=new DefaultMQPushConsumer(groupName,rpcHook, new AllocateMessageQueueAveragely(),true, MixAll.RMQ_SYS_TRACE_TOPIC);
//true启用消息跟踪,消息轨迹topic名称
consumer.setNamesrvAddr(namesrvAddr);
consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
consumer.setConsumeThreadMax(consumerCount);
consumer.setConsumeThreadMin(minThread);
consumer.setPullBatchSize(pullBatchSize);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
consumer.setInstanceName(pid+"_"+instanceName);
consumer.subscribe(topic, tags)//订阅topic->tags
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {  
  return ConsumeOrderlyStatus.SUCCESS;//消费成功
  //return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//消费重试
});//顺序消费(局部,若需要全局顺序则需要创建topic时候queue为1)
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {});//并发消费