broker是消息接收处理,落地的核心模块。这个模块用于接收producer发送的消息以及consumer
broker的启动过程是一个比较复杂的过程,其中涉及到很多broker模块自身的初始化,例如:路由信息设置,消息落地,消息推送,以及和namesvr模块通信部分的初始化。
broker启动可以分为两个步骤,initialize和start
1 当broker启动命令得到外部传入的初始化参数以后,将参数注入对应的config类当中,这些config类包括:
2 当配置信息设置完毕后,broker会将这些参数传入borkerController控制器当中,这个控制器会初始加载很多的管理器,如下:
3 当上述的管理器全部加载完成以后,控制器将开始进入下一步的初始化:
上述的全部过程中,broker做到比较关键的一些初始化是:启动netty服务端,注册多个消息处理器,初始化线程池用来做消息的并发处理
当broker初始化了配置参数以后,就可以开始启动部分了,启动的部分会简单一些:
broker启动整体过程可以参考下面的时序图:
当producer向namesrv发送消息了以后,namesrv会将消息转递给对应的broker进行消息的持久化(也叫做消息的落地),在这个过程中,broker又做了什么样的事情?
在刚刚上面的broker启动部分当中提到,broker会在初始化的时候注册多个消息处理器,那么这些处理在nettyServer收到消息的时候将会把消息交由对应的处理器进行处理。所以,当消息到达时,上面注册的消息处理器将会做出如下处理:
1 消息持久化过程
2 消息格式
长度 | 说明 |
---|---|
4 | 消息的长度 |
4 | 四个字节的MAGICCODE = daa320a7 |
4 | 消息体BODY CRC 当broker重启recover时会校验 |
4 | 消息queueId |
4 | 这个标志值rocketmq不做处理,只存储后透传 |
8 | /QUEUEOFFSET这个值是个自增值不是真正的consume queue的偏移量,可以代表这个队列中消息的个数,要通过这个值查找到consume queue中数据,QUEUEOFFSET * 20才是偏移地址 |
8 | PHYSICALOFFSET 代表消息在commitLog中的物理起始地址偏移量 |
4 | SYSFLAG消息标志,指明消息是事物事物状态等等消息特征 |
8 | BORNTIMESTAMP 消息产生端(producer)的时间戳 |
8 | BORNHOST 消息产生端(producer)地址(address:port) |
8 | STORETIMESTAMP 消息在broker存储时间 |
8 | STOREHOSTADDRESS 消息存储到broker的地址(address:port) |
4 | /RECONSUMETIMES消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了 |
8 | Prepared Transaction Offset 表示是prepared状态的事物消息 |
4 + bodyLength | 消息体的长度 |
1 + topicLength | topic的长度 |
2 + propertiesLength | 消息属性的长度 |
3 存储消息
4 刷盘策略
刷盘分为同步和异步,