Rocketmq的通信层是基于通信框架netty 4.0.21.Final之上做了简单的协议封装。不过在目前的版本中(v3.2.6),netty不再是强耦合与通信模块,基本的类图如下:
本章只浅述rocketMQ的运行原理,源码的解析将在后面进行
服务端的启动操作
remoting模块一般是由initialize()和start()两部分组成
initialize部分做的事情:
start部分做的事情:
客户端的启动操作
同服务端模块,客户端模块也是由initialize()和start()组成
initialize部分做的事情:
start部分做的事情:
invokeSyncImpl 同步调用
发送端做了什么:
构建ResponseFuture,设置opaque值,把ResponseFuture以opaque为键放入Map中保存
netty发送请送请求
发送成功设置ResponseFuture发送状态为成功;发送失败设置ResponseFuture发送失败,并且从Map存中移除ResponseFuture
responseFuture.waitResponse(timeoutMillis)获取响应(如果超时则抛出异常)
收到服务端的回应以后,从Map中根据opaque拿出responseFuture,将回应写入其中,并从Map中移除
resposneFuture得到回应,并将返回给客户端
接收端做了什么:
netty监听得到发送过来的消息,分拣给Server端进行处理
根据消息的code得到对应的处理器(Processor)
创建一个新的线程,在这个线程中让处理器去处理消息,并得到回应(Response)。判断如果消息不是单向的(one-way),则把请求中的opaque放回response中,并把消息发回给请求端。
将线程放入线程池--这里注意 请求消息的code对应了一组(Processor,ExectorService),处理器和线程池是对应的
完整的消息请求过程:
invokeAsyncImpl 异步调用
发送端做了什么:
异步的过程和同步非常类似,只不过同步的responseFuture需要等待回应,而异步的过程是利用回调。
获取信号量semaphoreAsync,看是否可以发送消息(异步一般链路耗时比较长,为了防止本地缓存的netty请求过多, 使用信号量来控制是否可以发动消息,信号量上限默认2048个)
生成callback回调并构建ResponseFuture,设置opaque值,把ResponseFuture以opaque为键放入Map中保存
netty发送请送请求
发送成功设置ResponseFuture发送状态为成功;发送失败设置ResponseFuture发送失败,并且从Map存中移除ResponseFuture
当服务端的消息回应后,调用回调方法,得到处理response,并从Map中移除
接收端的动作和同步是完全一样的
完整的消息请求过程:
invokeOnewayImpl 单向请求
单向请求就是一个简化的异步调用,消息发送至服务端,而服务端收到消息做相应的处理而不做出任何的回应。
完整的消息请求过程:
连接建立过程
想要发送消息就必须拿到发送方的连接channel,这个channel是在传输的第一步由发送方地址解析获得的。当第一次想拿到连接的时候就会去channelTables这个Map中去查找是否存在,如果不存在就会使用netty的引导(bootstrap)中去建立连接,并保存在channelTables中去。
scanResponseTable
前文中讲到,每次有消息需要发送,就会生成resposneFuture用于接收消息回应,但是如果始终没有收到回应,Map中的responseFuture就会堆积。
这个时候就需要一个线程来专门做Map的清理回收,这个线程会1s调用一次来检查所有的responseFuture是否有效,如果过期就会调用里面的回调方法通知调用者消息已经超过有效期,被移除掉了。
RemotingCommand是rocketMQ消息传输的媒介,所有的消息都会包装成RemotingCommand来进行传输。而这个对象会在netty传输之前进行编码,消息接收到进行解码。
RemotingCommand是由头部(header)和消息体(body)组成,消息发送的时候,头部和消息体会分开进行编码。那么RemotingCommand是如何组成的呢?
头部(header)
请求头接收方和发起方的含义略有不同,下面的表格详细的说明:
字段名 | 类型 | Request | Resposne |
---|---|---|---|
code | 整数 | 请求操作代码,接收方根据不同的代码做不同的操作 | 应答结果代码,0表示成功,非0表示各种错误代码 |
language | 枚举 | 请求方实现的语言,默认Java | 接收方实现的语言 |
version | 整数 | 请求方版本 | 接收方版本 |
opaque | 整数 | 请求方在同一连接上不同的请求标识代码,多线程连接服用使用 | 接收方不做修改,直接返回 |
flag | 整数 | 通信层的标志位 | 通信层的标志位 |
remark | 字符串 | 传输自定义文本信息 | 错误详细描述 |
extFields | HashMap | 自定义字段 | 自定义字段 |
header的编码是通过fastjson序列化数据,将头信息转为json。
而头信息里面还包括了CommandCustomHeader的自定义的一些头信息,这些信息会在编码的时候被当作extFields序列化,最后跟着头信息一起序列化为json
消息体
而消息体是直接变为byte数组,这两部分后一起放入netty传输的ByteBuffer中,一起传输到接收端