public void start() throws MQClientException {
+这个方法里其实主要是读取一些配置啥的,不是很复杂,
+public static NamesrvController start(final NamesrvController controller) throws Exception {
- synchronized (this) {
- switch (this.serviceState) {
- case CREATE_JUST:
- this.serviceState = ServiceState.START_FAILED;
- // If not specified,looking address from name server
- if (null == this.clientConfig.getNamesrvAddr()) {
- this.mQClientAPIImpl.fetchNameServerAddr();
- }
- // Start request-response channel
- // 这里主要是初始化了个网络客户端
- this.mQClientAPIImpl.start();
- // Start various schedule tasks
- // 定时任务
- this.startScheduledTask();
- // Start pull service
- // 这里重点说下
- this.pullMessageService.start();
- // Start rebalance service
- this.rebalanceService.start();
- // Start push service
- this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
- log.info("the client factory [{}] start OK", this.clientId);
- this.serviceState = ServiceState.RUNNING;
- break;
- case START_FAILED:
- throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
- default:
- break;
- }
- }
- }
-我们来看下这个 pullMessageService,org.apache.rocketmq.client.impl.consumer.PullMessageService,
![]()
实现了 runnable 接口,
然后可以看到 run 方法
-public void run() {
- log.info(this.getServiceName() + " service started");
+ if (null == controller) {
+ throw new IllegalArgumentException("NamesrvController is null");
+ }
- while (!this.isStopped()) {
- try {
- PullRequest pullRequest = this.pullRequestQueue.take();
- this.pullMessage(pullRequest);
- } catch (InterruptedException ignored) {
- } catch (Exception e) {
- log.error("Pull Message Service Run Method exception", e);
- }
- }
+ boolean initResult = controller.initialize();
+ if (!initResult) {
+ controller.shutdown();
+ System.exit(-3);
+ }
- log.info(this.getServiceName() + " service end");
- }
-接着在看 pullMessage 方法
-private void pullMessage(final PullRequest pullRequest) {
- final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
- if (consumer != null) {
- DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
- impl.pullMessage(pullRequest);
- } else {
- log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
- }
- }
-实际上调用了这个方法,这个方法很长,我在代码里注释下下每一段的功能
-public void pullMessage(final PullRequest pullRequest) {
- final ProcessQueue processQueue = pullRequest.getProcessQueue();
- // 这里开始就是检查状态,确定是否往下执行
- if (processQueue.isDropped()) {
- log.info("the pull request[{}] is dropped.", pullRequest.toString());
- return;
- }
+ Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ controller.shutdown();
+ return null;
+ }
+ }));
- pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
+ controller.start();
- try {
- this.makeSureStateOK();
- } catch (MQClientException e) {
- log.warn("pullMessage exception, consumer state not ok", e);
- this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
- return;
- }
+ return controller;
+ }
- if (this.isPause()) {
- log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
- return;
- }
+这个start里主要关注initialize方法,后面就是一个停机的hook,来看下initialize方法
+public boolean initialize() {
- // 这块其实是个类似于限流的功能块,对消息数量和消息大小做限制
- long cachedMessageCount = processQueue.getMsgCount().get();
- long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
+ this.kvConfigManager.load();
- if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((queueFlowControlTimes++ % 1000) == 0) {
- log.warn(
- "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
- this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
- }
- return;
- }
+ this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
- if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((queueFlowControlTimes++ % 1000) == 0) {
- log.warn(
- "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
- this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
- }
- return;
- }
+ this.remotingExecutor =
+ Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
- // 若不是顺序消费(即DefaultMQPushConsumerImpl.consumeOrderly等于false),则检查ProcessQueue对象的msgTreeMap:TreeMap<Long,MessageExt>变量的第一个key值与最后一个key值之间的差额,该key值表示查询的队列偏移量queueoffset;若差额大于阈值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默认是2000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后重新将该PullRequest请求放入PullMessageService.pullRequestQueue队列中;并跳出该方法;这里的意思主要就是消息有堆积了,等会再来拉取
- if (!this.consumeOrderly) {
- if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
- log.warn(
- "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
- pullRequest, queueMaxSpanFlowControlTimes);
- }
- return;
- }
- } else {
- if (processQueue.isLocked()) {
- if (!pullRequest.isLockedFirst()) {
- final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
- boolean brokerBusy = offset < pullRequest.getNextOffset();
- log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
- pullRequest, offset, brokerBusy);
- if (brokerBusy) {
- log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
- pullRequest, offset);
- }
+ this.registerProcessor();
- pullRequest.setLockedFirst(true);
- pullRequest.setNextOffset(offset);
- }
- } else {
- this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
- log.info("pull message later because not locked in broker, {}", pullRequest);
- return;
- }
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- // 以PullRequest.messageQueue对象的topic值为参数从RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData>中获取对应的SubscriptionData对象,若该对象为null,考虑到并发的关系,调用executePullRequestLater方法,稍后重试;并跳出该方法;
- final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
- if (null == subscriptionData) {
- this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
- log.warn("find the consumer's subscription failed, {}", pullRequest);
- return;
- }
+ @Override
+ public void run() {
+ NamesrvController.this.routeInfoManager.scanNotActiveBroker();
+ }
+ }, 5, 10, TimeUnit.SECONDS);
- final long beginTimestamp = System.currentTimeMillis();
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- // 异步拉取回调,先不讨论细节
- PullCallback pullCallback = new PullCallback() {
- @Override
- public void onSuccess(PullResult pullResult) {
- if (pullResult != null) {
- pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
- subscriptionData);
+ @Override
+ public void run() {
+ NamesrvController.this.kvConfigManager.printAllPeriodically();
+ }
+ }, 1, 10, TimeUnit.MINUTES);
- switch (pullResult.getPullStatus()) {
- case FOUND:
- long prevRequestOffset = pullRequest.getNextOffset();
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- long pullRT = System.currentTimeMillis() - beginTimestamp;
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullRT);
+ if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
+
+ try {
+ fileWatchService = new FileWatchService(
+ new String[] {
+ TlsSystemConfig.tlsServerCertPath,
+ TlsSystemConfig.tlsServerKeyPath,
+ TlsSystemConfig.tlsServerTrustCertPath
+ },
+ new FileWatchService.Listener() {
+ boolean certChanged, keyChanged = false;
+ @Override
+ public void onChanged(String path) {
+ if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+ log.info("The trust certificate changed, reload the ssl context");
+ reloadServerSslContext();
+ }
+ if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+ certChanged = true;
+ }
+ if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+ keyChanged = true;
+ }
+ if (certChanged && keyChanged) {
+ log.info("The certificate and private key changed, reload the ssl context");
+ certChanged = keyChanged = false;
+ reloadServerSslContext();
+ }
+ }
+ private void reloadServerSslContext() {
+ ((NettyRemotingServer) remotingServer).loadSslContext();
+ }
+ });
+ } catch (Exception e) {
+ log.warn("FileWatchService created error, can't load the certificate dynamically");
+ }
+ }
- long firstMsgOffset = Long.MAX_VALUE;
- if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- } else {
- firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
+ return true;
+ }
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
+这里的kvConfigManager主要是来加载NameServer的配置参数,存到org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#configTable中,然后是以BrokerHousekeepingService对象为参数初始化NettyRemotingServer对象,BrokerHousekeepingService对象作为该Netty连接中Socket链接的监听器(ChannelEventListener);监听与Broker建立的渠道的状态(空闲、关闭、异常三个状态),并调用BrokerHousekeepingService的相应onChannel方法。其中渠道的空闲、关闭、异常状态均调用RouteInfoManager.onChannelDestory方法处理。这个BrokerHousekeepingService可以字面化地理解为broker的管家服务,这个类内部三个状态方法其实都是调用的org.apache.rocketmq.namesrv.NamesrvController#getRouteInfoManager方法,而这个RouteInfoManager里面的对象有这些
+public class RouteInfoManager {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+ private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private final HashMap<String, List<QueueData>> topicQueueTable;
+
+ private final HashMap<String, BrokerData> brokerAddrTable;
+
+ private final HashMap<String, Set<String>> clusterAddrTable;
+
+ private final HashMap<String, BrokerLiveInfo> brokerLiveTable;
+
+ private final HashMap<String, List<String>> filterServerTable;
- boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
- pullResult.getMsgFoundList(),
- processQueue,
- pullRequest.getMessageQueue(),
- dispatchToConsume);
+然后接下去就是初始化了一个线程池,然后注册默认的处理类this.registerProcessor();默认都是这个处理器去处理请求 org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#DefaultRequestProcessor然后是初始化两个定时任务
+第一是每10秒检查一遍Broker的状态的定时任务,调用scanNotActiveBroker方法;遍历brokerLiveTable集合,查看每个broker的最后更新时间(BrokerLiveInfo.lastUpdateTimestamp)是否超过2分钟,若超过则关闭该broker的渠道并调用RouteInfoManager.onChannelDestory方法清理RouteInfoManager类的topicQueueTable、brokerAddrTable、clusterAddrTable、filterServerTable成员变量。
+this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
- DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
- } else {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- }
- }
+ @Override
+ public void run() {
+ NamesrvController.this.routeInfoManager.scanNotActiveBroker();
+ }
+ }, 5, 10, TimeUnit.SECONDS);
+public void scanNotActiveBroker() {
+ Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, BrokerLiveInfo> next = it.next();
+ long last = next.getValue().getLastUpdateTimestamp();
+ if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
+ RemotingUtil.closeChannel(next.getValue().getChannel());
+ it.remove();
+ log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
+ this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
+ }
+ }
+ }
- if (pullResult.getNextBeginOffset() < prevRequestOffset
- || firstMsgOffset < prevRequestOffset) {
- log.warn(
- "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
- pullResult.getNextBeginOffset(),
- firstMsgOffset,
- prevRequestOffset);
- }
+ public void onChannelDestroy(String remoteAddr, Channel channel) {
+ String brokerAddrFound = null;
+ if (channel != null) {
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
+ this.brokerLiveTable.entrySet().iterator();
+ while (itBrokerLiveTable.hasNext()) {
+ Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
+ if (entry.getValue().getChannel() == channel) {
+ brokerAddrFound = entry.getKey();
+ break;
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("onChannelDestroy Exception", e);
+ }
+ }
- break;
- case NO_NEW_MSG:
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+ if (null == brokerAddrFound) {
+ brokerAddrFound = remoteAddr;
+ } else {
+ log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
+ }
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+ if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case NO_MATCHED_MSG:
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+ try {
+ try {
+ this.lock.writeLock().lockInterruptibly();
+ this.brokerLiveTable.remove(brokerAddrFound);
+ this.filterServerTable.remove(brokerAddrFound);
+ String brokerNameFound = null;
+ boolean removeBrokerName = false;
+ Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
+ this.brokerAddrTable.entrySet().iterator();
+ while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
+ BrokerData brokerData = itBrokerAddrTable.next().getValue();
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+ Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Long, String> entry = it.next();
+ Long brokerId = entry.getKey();
+ String brokerAddr = entry.getValue();
+ if (brokerAddr.equals(brokerAddrFound)) {
+ brokerNameFound = brokerData.getBrokerName();
+ it.remove();
+ log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
+ brokerId, brokerAddr);
+ break;
+ }
+ }
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case OFFSET_ILLEGAL:
- log.warn("the pull request offset illegal, {} {}",
- pullRequest.toString(), pullResult.toString());
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+ if (brokerData.getBrokerAddrs().isEmpty()) {
+ removeBrokerName = true;
+ itBrokerAddrTable.remove();
+ log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
+ brokerData.getBrokerName());
+ }
+ }
- pullRequest.getProcessQueue().setDropped(true);
- DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
+ if (brokerNameFound != null && removeBrokerName) {
+ Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, Set<String>> entry = it.next();
+ String clusterName = entry.getKey();
+ Set<String> brokerNames = entry.getValue();
+ boolean removed = brokerNames.remove(brokerNameFound);
+ if (removed) {
+ log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
+ brokerNameFound, clusterName);
- @Override
- public void run() {
- try {
- DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
- pullRequest.getNextOffset(), false);
+ if (brokerNames.isEmpty()) {
+ log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
+ clusterName);
+ it.remove();
+ }
- DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
+ break;
+ }
+ }
+ }
- DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
+ if (removeBrokerName) {
+ Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
+ this.topicQueueTable.entrySet().iterator();
+ while (itTopicQueueTable.hasNext()) {
+ Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
+ String topic = entry.getKey();
+ List<QueueData> queueDataList = entry.getValue();
- log.warn("fix the pull request offset, {}", pullRequest);
- } catch (Throwable e) {
- log.error("executeTaskLater Exception", e);
- }
- }
- }, 10000);
- break;
- default:
- break;
- }
- }
- }
+ Iterator<QueueData> itQueueData = queueDataList.iterator();
+ while (itQueueData.hasNext()) {
+ QueueData queueData = itQueueData.next();
+ if (queueData.getBrokerName().equals(brokerNameFound)) {
+ itQueueData.remove();
+ log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
+ topic, queueData);
+ }
+ }
- @Override
- public void onException(Throwable e) {
- if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("execute the pull request exception", e);
- }
+ if (queueDataList.isEmpty()) {
+ itTopicQueueTable.remove();
+ log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
+ topic);
+ }
+ }
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("onChannelDestroy Exception", e);
+ }
+ }
+ }
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
- }
- };
- // 如果为集群模式,即可置commitOffsetEnable为 true
- boolean commitOffsetEnable = false;
- long commitOffsetValue = 0L;
- if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
- commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
- if (commitOffsetValue > 0) {
- commitOffsetEnable = true;
- }
- }
+第二个是每10分钟打印一次NameServer的配置参数。即KVConfigManager.configTable变量的内容。
+this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- // 将上面获得的commitOffsetEnable更新到订阅关系里
- String subExpression = null;
- boolean classFilter = false;
- SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
- if (sd != null) {
- if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
- subExpression = sd.getSubString();
- }
+ @Override
+ public void run() {
+ NamesrvController.this.kvConfigManager.printAllPeriodically();
+ }
+ }, 1, 10, TimeUnit.MINUTES);
- classFilter = sd.isClassFilterMode();
- }
+然后这个初始化就差不多完成了,后面只需要把remotingServer start一下就好了
+处理请求
直接上代码,其实主体是swtich case去判断
+@Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
- // 组成 sysFlag
- int sysFlag = PullSysFlag.buildSysFlag(
- commitOffsetEnable, // commitOffset
- true, // suspend
- subExpression != null, // subscription
- classFilter // class filter
- );
- // 调用真正的拉取消息接口
- try {
- this.pullAPIWrapper.pullKernelImpl(
- pullRequest.getMessageQueue(),
- subExpression,
- subscriptionData.getExpressionType(),
- subscriptionData.getSubVersion(),
- pullRequest.getNextOffset(),
- this.defaultMQPushConsumer.getPullBatchSize(),
- sysFlag,
- commitOffsetValue,
- BROKER_SUSPEND_MAX_TIME_MILLIS,
- CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
- CommunicationMode.ASYNC,
- pullCallback
- );
- } catch (Exception e) {
- log.error("pullKernelImpl exception", e);
- this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
- }
- }
-以下就是拉取消息的底层 api,不够不是特别复杂,主要是在找 broker,和设置请求参数
-public PullResult pullKernelImpl(
- final MessageQueue mq,
- final String subExpression,
- final String expressionType,
- final long subVersion,
- final long offset,
- final int maxNums,
- final int sysFlag,
- final long commitOffset,
- final long brokerSuspendMaxTimeMillis,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- final PullCallback pullCallback
-) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- FindBrokerResult findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
- if (null == findBrokerResult) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
- }
+ if (ctx != null) {
+ log.debug("receive request, {} {} {}",
+ request.getCode(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ request);
+ }
- if (findBrokerResult != null) {
- {
- // check version
- if (!ExpressionType.isTagType(expressionType)
- && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
- throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
- + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
- }
- }
- int sysFlagInner = sysFlag;
- if (findBrokerResult.isSlave()) {
- sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
- }
+ switch (request.getCode()) {
+ case RequestCode.PUT_KV_CONFIG:
+ return this.putKVConfig(ctx, request);
+ case RequestCode.GET_KV_CONFIG:
+ return this.getKVConfig(ctx, request);
+ case RequestCode.DELETE_KV_CONFIG:
+ return this.deleteKVConfig(ctx, request);
+ case RequestCode.QUERY_DATA_VERSION:
+ return queryBrokerTopicConfig(ctx, request);
+ case RequestCode.REGISTER_BROKER:
+ Version brokerVersion = MQVersion.value2Version(request.getVersion());
+ if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
+ return this.registerBrokerWithFilterServer(ctx, request);
+ } else {
+ return this.registerBroker(ctx, request);
+ }
+ case RequestCode.UNREGISTER_BROKER:
+ return this.unregisterBroker(ctx, request);
+ case RequestCode.GET_ROUTEINTO_BY_TOPIC:
+ return this.getRouteInfoByTopic(ctx, request);
+ case RequestCode.GET_BROKER_CLUSTER_INFO:
+ return this.getBrokerClusterInfo(ctx, request);
+ case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
+ return this.wipeWritePermOfBroker(ctx, request);
+ case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
+ return getAllTopicListFromNameserver(ctx, request);
+ case RequestCode.DELETE_TOPIC_IN_NAMESRV:
+ return deleteTopicInNamesrv(ctx, request);
+ case RequestCode.GET_KVLIST_BY_NAMESPACE:
+ return this.getKVListByNamespace(ctx, request);
+ case RequestCode.GET_TOPICS_BY_CLUSTER:
+ return this.getTopicsByCluster(ctx, request);
+ case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
+ return this.getSystemTopicListFromNs(ctx, request);
+ case RequestCode.GET_UNIT_TOPIC_LIST:
+ return this.getUnitTopicList(ctx, request);
+ case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
+ return this.getHasUnitSubTopicList(ctx, request);
+ case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
+ return this.getHasUnitSubUnUnitTopicList(ctx, request);
+ case RequestCode.UPDATE_NAMESRV_CONFIG:
+ return this.updateConfig(ctx, request);
+ case RequestCode.GET_NAMESRV_CONFIG:
+ return this.getConfig(ctx, request);
+ default:
+ break;
+ }
+ return null;
+ }
- PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
- requestHeader.setConsumerGroup(this.consumerGroup);
- requestHeader.setTopic(mq.getTopic());
- requestHeader.setQueueId(mq.getQueueId());
- requestHeader.setQueueOffset(offset);
- requestHeader.setMaxMsgNums(maxNums);
- requestHeader.setSysFlag(sysFlagInner);
- requestHeader.setCommitOffset(commitOffset);
- requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
- requestHeader.setSubscription(subExpression);
- requestHeader.setSubVersion(subVersion);
- requestHeader.setExpressionType(expressionType);
+以broker注册为例,
+case RequestCode.REGISTER_BROKER:
+ Version brokerVersion = MQVersion.value2Version(request.getVersion());
+ if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
+ return this.registerBrokerWithFilterServer(ctx, request);
+ } else {
+ return this.registerBroker(ctx, request);
+ }
- String brokerAddr = findBrokerResult.getBrokerAddr();
- if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
- brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
- }
+做了个简单的版本管理,我们看下前面一个的代码
+public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+ final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+ final RegisterBrokerRequestHeader requestHeader =
+ (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
- PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
- brokerAddr,
- requestHeader,
- timeoutMillis,
- communicationMode,
- pullCallback);
+ if (!checksum(ctx, request, requestHeader)) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("crc32 not match");
+ return response;
+ }
- return pullResult;
- }
+ RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
-}
-再看下一步的
-public PullResult pullMessage(
- final String addr,
- final PullMessageRequestHeader requestHeader,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- final PullCallback pullCallback
-) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+ if (request.getBody() != null) {
+ try {
+ registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
+ } catch (Exception e) {
+ throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
+ }
+ } else {
+ registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
+ registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
+ }
- switch (communicationMode) {
- case ONEWAY:
- assert false;
- return null;
- case ASYNC:
- this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
- return null;
- case SYNC:
- return this.pullMessageSync(addr, request, timeoutMillis);
- default:
- assert false;
- break;
- }
+ RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
+ requestHeader.getClusterName(),
+ requestHeader.getBrokerAddr(),
+ requestHeader.getBrokerName(),
+ requestHeader.getBrokerId(),
+ requestHeader.getHaServerAddr(),
+ registerBrokerBody.getTopicConfigSerializeWrapper(),
+ registerBrokerBody.getFilterServerList(),
+ ctx.channel());
- return null;
-}
-通过 communicationMode 判断是同步拉取还是异步拉取,异步就调用
-private void pullMessageAsync(
- final String addr,
- final RemotingCommand request,
- final long timeoutMillis,
- final PullCallback pullCallback
- ) throws RemotingException, InterruptedException {
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- 异步
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- try {
- PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
- assert pullResult != null;
- pullCallback.onSuccess(pullResult);
- } catch (Exception e) {
- pullCallback.onException(e);
- }
- } else {
- if (!responseFuture.isSendRequestOK()) {
- pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
- } else if (responseFuture.isTimeout()) {
- pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
- responseFuture.getCause()));
- } else {
- pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
- }
- }
- }
- });
- }
-并且会调用前面 pullCallback 的onSuccess和onException方法,同步的就是调用
-private PullResult pullMessageSync(
- final String addr,
- final RemotingCommand request,
- final long timeoutMillis
- ) throws RemotingException, InterruptedException, MQBrokerException {
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
- return this.processPullResponse(response);
- }
-然后就是这个 remotingClient 的 invokeAsync 跟 invokeSync 方法
-@Override
- public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
- throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
- RemotingSendRequestException {
- long beginStartTime = System.currentTimeMillis();
- final Channel channel = this.getAndCreateChannel(addr);
- if (channel != null && channel.isActive()) {
- try {
- doBeforeRpcHooks(addr, request);
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- throw new RemotingTooMuchRequestException("invokeAsync call timeout");
- }
- this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
- } catch (RemotingSendRequestException e) {
- log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw e;
- }
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }
-@Override
- public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
- throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
- long beginStartTime = System.currentTimeMillis();
- final Channel channel = this.getAndCreateChannel(addr);
- if (channel != null && channel.isActive()) {
- try {
- doBeforeRpcHooks(addr, request);
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- throw new RemotingTimeoutException("invokeSync call timeout");
- }
- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
- doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
- return response;
- } catch (RemotingSendRequestException e) {
- log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw e;
- } catch (RemotingTimeoutException e) {
- if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
- this.closeChannel(addr, channel);
- log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
- }
- log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
- throw e;
- }
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }
-再往下看
-public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
- final long timeoutMillis)
- throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
- final int opaque = request.getOpaque();
-
- try {
- 同步跟异步都是会把结果用ResponseFuture抱起来
- final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
- this.responseTable.put(opaque, responseFuture);
- final SocketAddress addr = channel.remoteAddress();
- channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
- } else {
- responseFuture.setSendRequestOK(false);
- }
-
- responseTable.remove(opaque);
- responseFuture.setCause(f.cause());
- responseFuture.putResponse(null);
- log.warn("send a request command to channel <" + addr + "> failed.");
- }
- });
- // 区别是同步的是在这等待
- RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
- if (null == responseCommand) {
- if (responseFuture.isSendRequestOK()) {
- throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
- responseFuture.getCause());
- } else {
- throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
- }
- }
+ responseHeader.setHaServerAddr(result.getHaServerAddr());
+ responseHeader.setMasterAddr(result.getMasterAddr());
- return responseCommand;
- } finally {
- this.responseTable.remove(opaque);
- }
- }
+ byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+ response.setBody(jsonValue);
- public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
- final InvokeCallback invokeCallback)
- throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
- long beginStartTime = System.currentTimeMillis();
- final int opaque = request.getOpaque();
- boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
- if (acquired) {
- final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- once.release();
- throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
- }
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+}
- final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
- this.responseTable.put(opaque, responseFuture);
- try {
- channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
- }
- requestFail(opaque);
- log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
- }
- });
- } catch (Exception e) {
- responseFuture.release();
- log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
- throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
- }
- } else {
- if (timeoutMillis <= 0) {
- throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
- } else {
- String info =
- String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
- timeoutMillis,
- this.semaphoreAsync.getQueueLength(),
- this.semaphoreAsync.availablePermits()
- );
- log.warn(info);
- throw new RemotingTimeoutException(info);
- }
- }
- }