broker 的启动形式有点类似于 NameServer,都是服务类型的,跟 Consumer 差别比较大,
+首先是org.apache.rocketmq.broker.BrokerStartup中的 main 函数,org.apache.rocketmq.broker.BrokerStartup#createBrokerController基本就是读取参数,这里差点把最核心的初始化给漏了,
+1 | final BrokerController controller = new BrokerController( |
前面是以 broker 配置,netty 的服务端和客户端配置,以及消息存储配置在实例化 BrokerController,然后就是初始化了
+1 | public boolean initialize() throws CloneNotSupportedException { |
前面这些就是各个配置的 load 了,然后是个我认为比较重要的部分messageStore 的实例化,
+1 | if (result) { |
先是实例化,实例化构造函数里的代码比较重要,重点看一下
+1 | public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, |
这里面有很多类,不过先把从构造函数里传进来的忽略下,接下来就是 AllocateMappedFileService 这个service,前面看过文章的可能会根据上面的代码猜到,这也是个 ServiceThread,如果是对RocketMQ 有所了解的可能从名字可以看出这个类是关于 RocketMQ 消息怎么落盘的,当需要创建MappedFile时(在MapedFileQueue.getLastMapedFile方法中),向该线程的requestQueue队列中放入AllocateRequest请求对象,该线程会在后台监听该队列,并在后台创建MapedFile对象,即同时创建了物理文件。然后是创建了 IndexService 服务线程,用来给创建索引;还有是FlushConsumeQueueService是将ConsumeQueue 刷入磁盘;CleanCommitLogService用来清理过期的 CommitLog,默认是 72 小时以上;CleanConsumeQueueService是将小于最新的 CommitLog 偏移量的 ConsumeQueue 清理掉;StoreStatsService是储存统计服务;HAService用于CommitLog 的主备同步;ScheduleMessageService用于定时消息;还有就是这个ReputMessageService非常重要,就是由它实现了将 CommitLog 以 topic+queue 纬度构建 ConsumeQueue,后面TransientStorePool是异步刷盘时的存储buffer,也可以从后面的判断中看出来
1 | public boolean isTransientStorePoolEnable() { |
再然后就是启动两个服务线程,dispatcherList是为CommitLog文件转发请求,差不多这个初始化就这些内容。
+然后回到外层,下面是主备切换的配置,然后是数据统计,接着是存储插件加载,然后是往转发器链表里再加一个过滤器
+1 | if (messageStoreConfig.isEnableDLegerCommitLog()) { |
接下来就是org.apache.rocketmq.store.MessageStore#load的过程了,
+1) 调用ScheduleMessageService.load方法,初始化延迟级别列表。将这些级别(”1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”)的延时存入延迟级别delayLevelTable:ConcurrentHashMap<Integer /* level /, Long/ delay timeMillis */>变量中,例如1s的kv值为1:1000,5s的kv值为2:5000,key值依次类推;每个延迟级别即为一个队列。
+2)调用CommitLog.load方法,在此方法中调用MapedFileQueue.load方法,将$HOME /store/commitlog目录下的所有文件加载到MapedFileQueue的List
3)调用DefaultMessageStore.loadConsumeQueue方法加载consumequeue文件数据到DefaultMessageStore.consumeQueueTable集合中。
+初始化StoreCheckPoint对象,加载$HOME/store/checkpoint文件,该文件记录三个字段值,分别是物理队列消息时间戳、逻辑队列消息时间戳、索引队列消息时间戳。
+调用IndexService.load方法加载$HOME/store/index目录下的文件。对该目录下的每个文件初始化一个IndexFile对象。然后调用IndexFile对象的load方法将IndexHeader加载到对象的变量中;再根据检查是否存在abort文件,若有存在abort文件,则表示Broker表示上次是异常退出的,则检查checkpoint的indexMsgTimestamp字段值是否小于IndexHeader的endTimestamp值,indexMsgTimestamp值表示最后刷盘的时间,若小于则表示在最后刷盘之后在该文件中还创建了索引,则要删除该Index文件,否则将该IndexFile对象放入indexFileList:ArrayList
然后调用org.apache.rocketmq.store.DefaultMessageStore#recover恢复,前面有根据boolean lastExitOK = !this.isTempFileExist();临时文件是否存在来判断上一次是否正常退出,根据这个状态来选择什么恢复策略
接下去是初始化 Netty 服务端,初始化发送消息线程池(sendMessageExecutor)、拉取消息线程池(pullMessageExecutor)、管理Broker线程池(adminBrokerExecutor)、客户端管理线程池(clientManageExecutor),注册事件处理器,包括发送消息事件处理器(SendMessageProcessor)、拉取消息事件处理器、查询消息事件处理器(QueryMessageProcessor,包括客户端的心跳事件、注销事件、获取消费者列表事件、更新更新和查询消费进度consumerOffset)、客户端管理事件处理器(ClientManageProcessor)、结束事务处理器(EndTransactionProcessor)、默认事件处理器(AdminBrokerProcessor),然后是定时任务
+BrokerController.this.getBrokerStats().record(); 记录 Broker 状态
BrokerController.this.consumerOffsetManager.persist(); 持久化consumerOffset
BrokerController.this.consumerFilterManager.persist();持久化consumerFilter
BrokerController.this.protectBroker(); 保护 broker,消费慢,不让继续投递
BrokerController.this.printWaterMark(); 打印水位
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); 检查落后程度
BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); 定时获取 nameserver
BrokerController.this.printMasterAndSlaveDiff(); 打印主从不一致
然后是 tsl,初始化事务消息,初始化 RPCHook
+请把害怕打到公屏上🤦♂️,从线程池名字和调用的方法应该可以看出大部分的用途
+1 | this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); |
Broker 启动过程
+贴代码
+1 | public void start() throws Exception { |
首先是启动messageStore,调用 start 方法,这里面又调用了一些代码
+1 | public void start() throws Exception { |
调用DefaultMessageStore.start方法启动DefaultMessageStore对象中的一些服务线程。
+-
+
- 启动ReputMessageService服务线程 +
- 启动FlushConsumeQueueService服务线程; +
- 调用CommitLog.start方法,启动CommitLog对象中的FlushCommitLogService线程服务,若是同步刷盘(SYNC_FLUSH)则是启动GroupCommitService线程服务;若是异步刷盘(ASYNC_FLUSH)则是启动FlushRealTimeService线程服务; +
- 启动StoreStatsService线程服务; +
- 启动定时清理任务 +
然后是启动ClientHousekeepingService的 netty 服务端和客户端,然后是启动fileWatchService证书服务,接着启动BrokerOuterAPI中的NettyRemotingClient,即建立与NameServer的链接,用于自身Broker与其他模块的RPC功能调用;包括获取NameServer的地址、注册Broker、注销Broker、获取Topic配置、获取消息进度信息、获取订阅关系等RPC功能,然后是PullRequestHoldService服务线程,这个就是实现长轮询的,然后启动管家ClientHousekeepingService服务,负责扫描不活跃的生产者,消费者和 filter,启动FilterServerManager 过滤器服务管理,然后启动定时任务调用org.apache.rocketmq.broker.BrokerController#registerBrokerAll向所有 nameserver 注册 broker,最后是按需开启org.apache.rocketmq.store.stats.BrokerStatsManager和org.apache.rocketmq.broker.latency.BrokerFastFailure,基本上启动过程就完成了
+ +
+
+