RocketMQ原理详解

rocketMQ通信模块

Rocketmq的通信层是基于通信框架netty 4.0.21.Final之上做了简单的协议封装。不过在目前的版本中(v3.2.6),netty不再是强耦合与通信模块,基本的类图如下:

通信模块启动时做了什么


本章只浅述rocketMQ的运行原理,源码的解析将在后面进行

服务端的启动操作

remoting模块一般是由initialize()和start()两部分组成

initialize部分做的事情:

  • 初始化netty引导(serverBootStrap),初始化boss线程池、worker线程池。
  • 初始化ChannelEvent监听(这个用于监听channelEvent事件的到达)和ChannelEvent执行器
  • 注册通讯命令(RemotingCommand)解析器,这个解析器用于解释服务端接受到的命令(rocketMQ中的通讯消息全部由通讯命令封装,不同的通讯命令由不同的通讯命令类型构成)。

start部分做的事情:

  • 启动netty引导程序(bootstrap),设置事件循环组(EventLoopGroup)boss线程池,worker线程池。
  • 启动ChannelEvent执行器

客户端的启动操作

同服务端模块,客户端模块也是由initialize()和start()组成

initialize部分做的事情:

  • 初始化netty引导(serverBootStrap),初始化worker线程池(netty的客户端只有工作线程)
  • 初始化ChannelEvent监听(这个用于监听channelEvent事件的到达)

start部分做的事情:

  • 初始化了默认事件执行器线程池(defaultEventExecutorGroup)
  • 启动netty引导程序(bootstrap),设置事件循环组(EventLoopGroup)worker线程池
  • 打开一个定时调度线程,定时查看超时的缓存请求,有callback的执行callback,让后从缓存中移除再释放请求

通讯模块是怎么进行的消息传输的


invokeSyncImpl 同步调用

发送端做了什么:

  • 构建ResponseFuture,设置opaque值,把ResponseFuture以opaque为键放入Map中保存

  • netty发送请送请求

  • 发送成功设置ResponseFuture发送状态为成功;发送失败设置ResponseFuture发送失败,并且从Map存中移除ResponseFuture

  • responseFuture.waitResponse(timeoutMillis)获取响应(如果超时则抛出异常)

  • 收到服务端的回应以后,从Map中根据opaque拿出responseFuture,将回应写入其中,并从Map中移除

  • resposneFuture得到回应,并将返回给客户端

接收端做了什么:

  • netty监听得到发送过来的消息,分拣给Server端进行处理

  • 根据消息的code得到对应的处理器(Processor)

  • 创建一个新的线程,在这个线程中让处理器去处理消息,并得到回应(Response)。判断如果消息不是单向的(one-way),则把请求中的opaque放回response中,并把消息发回给请求端。

  • 将线程放入线程池--这里注意 请求消息的code对应了一组(Processor,ExectorService),处理器和线程池是对应的

完整的消息请求过程:

invokeAsyncImpl 异步调用

发送端做了什么:

异步的过程和同步非常类似,只不过同步的responseFuture需要等待回应,而异步的过程是利用回调。

  • 获取信号量semaphoreAsync,看是否可以发送消息(异步一般链路耗时比较长,为了防止本地缓存的netty请求过多, 使用信号量来控制是否可以发动消息,信号量上限默认2048个)

  • 生成callback回调并构建ResponseFuture,设置opaque值,把ResponseFuture以opaque为键放入Map中保存

  • netty发送请送请求

  • 发送成功设置ResponseFuture发送状态为成功;发送失败设置ResponseFuture发送失败,并且从Map存中移除ResponseFuture

  • 当服务端的消息回应后,调用回调方法,得到处理response,并从Map中移除

接收端的动作和同步是完全一样

完整的消息请求过程:

invokeOnewayImpl 单向请求

单向请求就是一个简化的异步调用,消息发送至服务端,而服务端收到消息做相应的处理而不做出任何的回应。

完整的消息请求过程:

其他的部分的处理过程

连接建立过程

想要发送消息就必须拿到发送方的连接channel,这个channel是在传输的第一步由发送方地址解析获得的。当第一次想拿到连接的时候就会去channelTables这个Map中去查找是否存在,如果不存在就会使用netty的引导(bootstrap)中去建立连接,并保存在channelTables中去。

scanResponseTable

前文中讲到,每次有消息需要发送,就会生成resposneFuture用于接收消息回应,但是如果始终没有收到回应,Map中的responseFuture就会堆积。

这个时候就需要一个线程来专门做Map的清理回收,这个线程会1s调用一次来检查所有的responseFuture是否有效,如果过期就会调用里面的回调方法通知调用者消息已经超过有效期,被移除掉了。

传输中的协议

RemotingCommand是rocketMQ消息传输的媒介,所有的消息都会包装成RemotingCommand来进行传输。而这个对象会在netty传输之前进行编码,消息接收到进行解码。

RemotingCommand是由头部(header)和消息体(body)组成,消息发送的时候,头部和消息体会分开进行编码。那么RemotingCommand是如何组成的呢?

头部(header)

请求头接收方和发起方的含义略有不同,下面的表格详细的说明:

字段名 类型 Request Resposne
code 整数 请求操作代码,接收方根据不同的代码做不同的操作 应答结果代码,0表示成功,非0表示各种错误代码
language 枚举 请求方实现的语言,默认Java 接收方实现的语言
version 整数 请求方版本 接收方版本
opaque 整数 请求方在同一连接上不同的请求标识代码,多线程连接服用使用 接收方不做修改,直接返回
flag 整数 通信层的标志位 通信层的标志位
remark 字符串 传输自定义文本信息 错误详细描述
extFields HashMap 自定义字段 自定义字段

header的编码是通过fastjson序列化数据,将头信息转为json。

而头信息里面还包括了CommandCustomHeader的自定义的一些头信息,这些信息会在编码的时候被当作extFields序列化,最后跟着头信息一起序列化为json

消息体

而消息体是直接变为byte数组,这两部分后一起放入netty传输的ByteBuffer中,一起传输到接收端