前面介绍了,nameserver相当于dubbo的注册中心,用与管理broker,broker会在启动的时候注册到nameserver,并且会发送心跳给namaserver,nameserver负责保存活跃的broker,包括master和slave,同时保存topic和topic下的队列,以及filter列表,然后为producer和consumer的请求提供服务。
+启动过程
1 | public static void main(String[] args) { |
入口的代码时这样子,其实主要的逻辑在createNamesrvController和start方法,来看下这两个的实现
+1 | public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { |
这个方法里其实主要是读取一些配置啥的,不是很复杂,
+1 | public static NamesrvController start(final NamesrvController controller) throws Exception { |
这个start里主要关注initialize方法,后面就是一个停机的hook,来看下initialize方法
+1 | public boolean initialize() { |
这里的kvConfigManager主要是来加载NameServer的配置参数,存到org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#configTable中,然后是以BrokerHousekeepingService对象为参数初始化NettyRemotingServer对象,BrokerHousekeepingService对象作为该Netty连接中Socket链接的监听器(ChannelEventListener);监听与Broker建立的渠道的状态(空闲、关闭、异常三个状态),并调用BrokerHousekeepingService的相应onChannel方法。其中渠道的空闲、关闭、异常状态均调用RouteInfoManager.onChannelDestory方法处理。这个BrokerHousekeepingService可以字面化地理解为broker的管家服务,这个类内部三个状态方法其实都是调用的org.apache.rocketmq.namesrv.NamesrvController#getRouteInfoManager方法,而这个RouteInfoManager里面的对象有这些
+1 | public class RouteInfoManager { |
然后接下去就是初始化了一个线程池,然后注册默认的处理类this.registerProcessor();默认都是这个处理器去处理请求 org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#DefaultRequestProcessor然后是初始化两个定时任务
第一是每10秒检查一遍Broker的状态的定时任务,调用scanNotActiveBroker方法;遍历brokerLiveTable集合,查看每个broker的最后更新时间(BrokerLiveInfo.lastUpdateTimestamp)是否超过2分钟,若超过则关闭该broker的渠道并调用RouteInfoManager.onChannelDestory方法清理RouteInfoManager类的topicQueueTable、brokerAddrTable、clusterAddrTable、filterServerTable成员变量。
+1 | this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { |
第二个是每10分钟打印一次NameServer的配置参数。即KVConfigManager.configTable变量的内容。
+1 | this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { |
然后这个初始化就差不多完成了,后面只需要把remotingServer start一下就好了
+处理请求
直接上代码,其实主体是swtich case去判断
+1 |
|
以broker注册为例,
+1 | case RequestCode.REGISTER_BROKER: |
做了个简单的版本管理,我们看下前面一个的代码
+1 | public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) |
可以看到主要的逻辑还是在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker这个方法里
1 | public RegisterBrokerResult registerBroker( |
这个是注册 broker 的逻辑,再看下根据 topic 获取 broker 信息和 topic 信息,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic 主要是这个方法的逻辑
1 | public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, |
首先调用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData从org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#topicQueueTable获取到org.apache.rocketmq.common.protocol.route.QueueData这里面存了 brokerName,再通过org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#brokerAddrTable里获取到 broker 的地址信息等,然后再获取 orderMessage 的配置。
简要分析了下 RocketMQ 的 NameServer 的代码,比较粗粒度。
+ +
+
+