RocketMQ原理详解

rocketMQ的broker模块

broker是消息接收处理,落地的核心模块。这个模块用于接收producer发送的消息以及consumer

一 、broker启动

broker的启动过程是一个比较复杂的过程,其中涉及到很多broker模块自身的初始化,例如:路由信息设置,消息落地,消息推送,以及和namesvr模块通信部分的初始化。

broker启动可以分为两个步骤,initialize和start

initialize

1 当broker启动命令得到外部传入的初始化参数以后,将参数注入对应的config类当中,这些config类包括:

  • broker自身的配置:包括根目录,namesrv地址,broker的IP和名称,消息队列数,收发消息线程池数等参数
  • netty启动配置:包括netty监听端口,工作线程数,异步发送消息信号量数量等网络配置等参数
  • 存储层配置:包括存储跟目录,CommitLog配置,持久化策略配置等参数

2 当配置信息设置完毕后,broker会将这些参数传入borkerController控制器当中,这个控制器会初始加载很多的管理器,如下:

  • topicManager:用于管理broker中存储的所有topic的配置
  • consumerOffsetManager:管理Consumer的消费进度
  • subscriptionGroupManager:用来管理订阅组,包括订阅权限等
  • messageStore:用于broker层的消息落地存储

3 当上述的管理器全部加载完成以后,控制器将开始进入下一步的初始化:

  • 启动netty服务端,用于接收消息
  • 初始化多个线程池,包括:sendMessageExecutor、pullMessageExecutor、adminBrokerExecutor、clientManageExecutor,分别用于发送消息、拉取消息、broker管理器执行、客户端管理器执行,这些处理过程会放入此线程池中处理,来做并发处理
  • 将上述的线程池注册到netty消息处理器当中
  • 启动定时调度线程来做很多事情,包括:一天上报一次broker的所有状态,10秒持久化一次Consumer消费进度,60分钟清理一次无效的topic订阅信息,60秒获取一次namesrv的地址信息

上述的全部过程中,broker做到比较关键的一些初始化是:启动netty服务端,注册多个消息处理器,初始化线程池用来做消息的并发处理

start

当broker初始化了配置参数以后,就可以开始启动部分了,启动的部分会简单一些:

  • 启动刚刚初始化的各个管理器:topicManager,consumerOffsetManager,subscriptionGroupManager,messageStore
  • 开启定时调度线程30秒向namesrv不断上报自己的信息
  • 启动线程处理无用的topic

broker启动整体过程可以参考下面的时序图:

二、消息的接收

当producer向namesrv发送消息了以后,namesrv会将消息转递给对应的broker进行消息的持久化(也叫做消息的落地),在这个过程中,broker又做了什么样的事情?

接到消息时

在刚刚上面的broker启动部分当中提到,broker会在初始化的时候注册多个消息处理器,那么这些处理在nettyServer收到消息的时候将会把消息交由对应的处理器进行处理。所以,当消息到达时,上面注册的消息处理器将会做出如下处理:

1 消息持久化过程

  • 消息处理器:消息到达SendMessageProcessor时,需要为消息随即分配队列,然后自己生成一个内部消息(msgInner),这个消息加入了一些broker内部的信息,例如:消息队列,消息内部标示,channel连接地址等。然后拿到broker的持久化模块,进行持久化。
  • 消息持久化准备:processor将生成的内部消息交给commitLog时,生成消息的存储时间以及CRC消息体。拿到上一个存储消息的文件持久化工具,对文件准备进行持久化。
  • 消息持久化:将刚刚的消息根据topic和消息队列号(queueId)拿到队列的偏移量,计算整个消息需要的存储长度,如果合规则写入缓存,然后写入文件中。
  • 分发处理:在文件写入后将生成分发请求,有分发线程开始处理,将消息写入consumeQueue中
  • 刷盘请求:根据消息的刷盘策略进行消息刷盘

2 消息格式

长度 说明
4 消息的长度
4 四个字节的MAGICCODE = daa320a7
4 消息体BODY CRC 当broker重启recover时会校验
4 消息queueId
4 这个标志值rocketmq不做处理,只存储后透传
8 /QUEUEOFFSET这个值是个自增值不是真正的consume queue的偏移量,可以代表这个队列中消息的个数,要通过这个值查找到consume queue中数据,QUEUEOFFSET * 20才是偏移地址
8 PHYSICALOFFSET 代表消息在commitLog中的物理起始地址偏移量
4 SYSFLAG消息标志,指明消息是事物事物状态等等消息特征
8 BORNTIMESTAMP 消息产生端(producer)的时间戳
8 BORNHOST 消息产生端(producer)地址(address:port)
8 STORETIMESTAMP 消息在broker存储时间
8 STOREHOSTADDRESS 消息存储到broker的地址(address:port)
4 /RECONSUMETIMES消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了
8 Prepared Transaction Offset 表示是prepared状态的事物消息
4 + bodyLength 消息体的长度
1 + topicLength topic的长度
2 + propertiesLength 消息属性的长度

3 存储消息

  • 在commitLog中维护一个名为mapedFileQueue的文件映射队列,这个队列记录了所有的记录消息的映射文件。
  • 每次在存储消息的时候拿到最近使用的映射文件,如果文件满了会创建一个新的,并加入队列,映射文件的大小默认为1G。
  • MapedFileQueue在获取getLastMapedFile时,如果需要创建新的MapedFile会计算出下一个MapedFile文件地址,通过预分配服务AllocateMapedFileService异步预创建下一个MapedFile文件,这样下次创建新文件请求就不要等待,因为创建文件特别是一个1G的文件还是有点耗时的

4 刷盘策略

刷盘分为同步和异步,