匠心零度 转载请注明原创出处,谢谢!java
rocketmq的几个核心的模块,而对于每一个模块都是单独的jvm进程,咱们看到上面的架构图的时候,那些箭头就是rocketmq的rpc调用,下面咱们来看看rocketmq的rpc是若是进行封装实现的。服务器
说明: rocketmq系列都将会以rocketmq-4.1.0-incubating进行介绍。网络
先排除Master、Slave直接经过原生的nio进行调用,其余通信都是基于netty-all-4.0.36.Final以及RocketMQ自定义协议进行通信的。架构
咱们来看看header data里面的数据定义:异步
code对于Request来讲就是RequestCode类里面的常量信息:jvm
说明:公众号【匠心零度】回复:rocketmq,可得到基于rocketmq4.1.0加详细中文代码注释 。tcp
public class RequestCode { // Broker 发送消息 public static final int SEND_MESSAGE = 10; // Broker 订阅消息 public static final int PULL_MESSAGE = 11; // Broker 查询消息 public static final int QUERY_MESSAGE = 12; // Broker 查询Broker Offset public static final int QUERY_BROKER_OFFSET = 13; // Broker 查询Consumer Offset public static final int QUERY_CONSUMER_OFFSET = 14; // Broker 更新Consumer Offset public static final int UPDATE_CONSUMER_OFFSET = 15; // Broker 更新或者增长一个Topic public static final int UPDATE_AND_CREATE_TOPIC = 17; // Broker 获取全部Topic的配置(Slave和Namesrv都会向Master请求此配置) public static final int GET_ALL_TOPIC_CONFIG = 21; // Broker 获取全部Topic配置(Slave和Namesrv都会向Master请求此配置 public static final int GET_TOPIC_CONFIG_LIST = 22; // Broker 获取全部Topic名称列表 public static final int GET_TOPIC_NAME_LIST = 23; // Broker 更新Broker上的配置 public static final int UPDATE_BROKER_CONFIG = 25; // Broker 获取Broker上的配置 public static final int GET_BROKER_CONFIG = 26; // Broker 触发Broker删除文件 public static final int TRIGGER_DELETE_FILES = 27; // Broker 获取Broker运行时信息 public static final int GET_BROKER_RUNTIME_INFO = 28; // Broker 根据时间查询队列的Offset public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29; // Broker 查询队列最大Offset public static final int GET_MAX_OFFSET = 30; // Broker 查询队列最小Offset public static final int GET_MIN_OFFSET = 31; // Broker 查询队列最先消息对应时间 public static final int GET_EARLIEST_MSG_STORETIME = 32; // Broker 根据消息ID来查询消息 public static final int VIEW_MESSAGE_BY_ID = 33; // Broker Client向Client发送心跳,并注册自身 public static final int HEART_BEAT = 34; // Broker Client注销 public static final int UNREGISTER_CLIENT = 35; // Broker Consumer将处理不了的消息发回服务器 public static final int CONSUMER_SEND_MSG_BACK = 36; // Broker Commit或者Rollback事务 public static final int END_TRANSACTION = 37; // Broker 获取ConsumerId列表经过GroupName public static final int GET_CONSUMER_LIST_BY_GROUP = 38; // Broker 主动向Producer回查事务状态 public static final int CHECK_TRANSACTION_STATE = 39; // Broker Broker通知Consumer列表变化 public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40; // Broker Consumer向Master锁定队列 public static final int LOCK_BATCH_MQ = 41; // Broker Consumer向Master解锁队列 public static final int UNLOCK_BATCH_MQ = 42; // Broker 获取全部Consumer Offset public static final int GET_ALL_CONSUMER_OFFSET = 43; // Broker 获取全部定时进度 public static final int GET_ALL_DELAY_OFFSET = 45; public static final int CHECK_CLIENT_CONFIG = 46; // Namesrv 向Namesrv追加KV配置 public static final int PUT_KV_CONFIG = 100; // Namesrv 从Namesrv获取KV配置 public static final int GET_KV_CONFIG = 101; // Namesrv 从Namesrv获取KV配置 public static final int DELETE_KV_CONFIG = 102; // Namesrv 注册一个Broker,数据都是持久化的,若是存在则覆盖配置 public static final int REGISTER_BROKER = 103; // Namesrv 卸载一个Broker,数据都是持久化的 public static final int UNREGISTER_BROKER = 104; // Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列) public static final int GET_ROUTEINTO_BY_TOPIC = 105; // Namesrv 获取注册到Name Server的全部Broker集群信息 public static final int GET_BROKER_CLUSTER_INFO = 106; public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200; public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201; public static final int GET_TOPIC_STATS_INFO = 202; public static final int GET_CONSUMER_CONNECTION_LIST = 203; public static final int GET_PRODUCER_CONNECTION_LIST = 204; public static final int WIPE_WRITE_PERM_OF_BROKER = 205; // 从Name Server获取完整Topic列表 public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206; // 从Broker删除订阅组 public static final int DELETE_SUBSCRIPTIONGROUP = 207; // 从Broker获取消费状态(进度) public static final int GET_CONSUME_STATS = 208; // Suspend Consumer消费过程 public static final int SUSPEND_CONSUMER = 209; // Resume Consumer消费过程 public static final int RESUME_CONSUMER = 210; // 重置Consumer Offset public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211; // 重置Consumer Offset public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212; // 调整Consumer线程池数量 public static final int ADJUST_CONSUMER_THREAD_POOL = 213; // 查询消息被哪些消费组消费 public static final int WHO_CONSUME_THE_MESSAGE = 214; // 从Broker删除Topic配置 public static final int DELETE_TOPIC_IN_BROKER = 215; // 从Namesrv删除Topic配置 public static final int DELETE_TOPIC_IN_NAMESRV = 216; // 经过NameSpace获取全部的KV List public static final int GET_KVLIST_BY_NAMESPACE = 219; // offset 重置 public static final int RESET_CONSUMER_CLIENT_OFFSET = 220; // 客户端订阅消息 public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221; // 通知 broker 调用 offset 重置处理 public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222; // 通知 broker 调用客户端订阅消息处理 public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223; // Broker 查询topic被谁消费 public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300; // 获取指定集群下的全部 topic public static final int GET_TOPICS_BY_CLUSTER = 224; // 向Broker注册Filter Server public static final int REGISTER_FILTER_SERVER = 301; // 向Filter Server注册Class public static final int REGISTER_MESSAGE_FILTER_CLASS = 302; // 根据 topic 和 group 获取消息的时间跨度 public static final int QUERY_CONSUME_TIME_SPAN = 303; // 获取全部系统内置 Topic 列表 public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304; public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305; // 清理失效队列 public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306; // 经过Broker查询Consumer内存数据 public static final int GET_CONSUMER_RUNNING_INFO = 307; // 查找被修正 offset (转发组件) public static final int QUERY_CORRECTION_OFFSET = 308; // 经过Broker直接向某个Consumer发送一条消息,并马上消费,返回结果给broker,再返回给调用方 public static final int CONSUME_MESSAGE_DIRECTLY = 309; // Broker 发送消息,优化网络数据包 public static final int SEND_MESSAGE_V2 = 310; // 单元化相关 topic public static final int GET_UNIT_TOPIC_LIST = 311; // 获取含有单元化订阅组的 Topic 列表 public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312; // 获取含有单元化订阅组的非单元化 Topic 列表 public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313; // 克隆某一个组的消费进度到新的组 public static final int CLONE_GROUP_OFFSET = 314; // 查看Broker上的各类统计信息 public static final int VIEW_BROKER_STATS_DATA = 315; public static final int CLEAN_UNUSED_TOPIC = 316; public static final int GET_BROKER_CONSUME_STATS = 317; /** * update the config of name server */ public static final int UPDATE_NAMESRV_CONFIG = 318; /** * get config from name server */ public static final int GET_NAMESRV_CONFIG = 319; public static final int SEND_BATCH_MESSAGE = 320; public static final int QUERY_CONSUME_QUEUE = 321; }
code对于Response来讲就是ResponseCode类里面的常量信息:源码分析
public class ResponseCode extends RemotingSysResponseCode { public static final int FLUSH_DISK_TIMEOUT = 10; public static final int SLAVE_NOT_AVAILABLE = 11; public static final int FLUSH_SLAVE_TIMEOUT = 12; public static final int MESSAGE_ILLEGAL = 13; public static final int SERVICE_NOT_AVAILABLE = 14; public static final int VERSION_NOT_SUPPORTED = 15; public static final int NO_PERMISSION = 16; public static final int TOPIC_NOT_EXIST = 17; public static final int TOPIC_EXIST_ALREADY = 18; public static final int PULL_NOT_FOUND = 19; public static final int PULL_RETRY_IMMEDIATELY = 20; public static final int PULL_OFFSET_MOVED = 21; public static final int QUERY_NOT_FOUND = 22; public static final int SUBSCRIPTION_PARSE_FAILED = 23; public static final int SUBSCRIPTION_NOT_EXIST = 24; public static final int SUBSCRIPTION_NOT_LATEST = 25; public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26; public static final int FILTER_DATA_NOT_EXIST = 27; public static final int FILTER_DATA_NOT_LATEST = 28; public static final int TRANSACTION_SHOULD_COMMIT = 200; public static final int TRANSACTION_SHOULD_ROLLBACK = 201; public static final int TRANSACTION_STATE_UNKNOW = 202; public static final int TRANSACTION_STATE_GROUP_WRONG = 203; public static final int NO_BUYER_ID = 204; public static final int NOT_IN_CURRENT_UNIT = 205; public static final int CONSUMER_NOT_ONLINE = 206; public static final int CONSUME_MSG_TIMEOUT = 207; public static final int NO_MESSAGE = 208; }
flag字段进行说明,其余后续分析到具体的具体分析。学习
flag = 0表示是request,flag = 1表示是response。优化
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND public RemotingCommandType getType() { if (this.isResponseType()) {//flag=1为true return RemotingCommandType.RESPONSE_COMMAND; } return RemotingCommandType.REQUEST_COMMAND; } public boolean isResponseType() { int bits = 1 << RPC_TYPE; return (this.flag & bits) == bits; }
flag为二、3(二进制表示十、11)为oneway请求。
private static final int RPC_ONEWAY = 1; // Oneway bit public void markOnewayRPC() { int bits = 1 << RPC_ONEWAY; this.flag |= bits; } public boolean isOnewayRPC() { int bits = 1 << RPC_ONEWAY; return (this.flag & bits) == bits; }
code=310很快咱们就明白什么意思了:
对于下面相似a、b、c等能够简单查看下类SendMessageRequestHeaderV2(后续继续讲解)基本就是相似js压缩效果,能够借鉴学习下。
public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(final SendMessageRequestHeader v1) { SendMessageRequestHeaderV2 v2 = new SendMessageRequestHeaderV2(); /** * 进行转换,这样网络传输数据就比较小了,学习下 */ v2.a = v1.getProducerGroup(); v2.b = v1.getTopic(); v2.c = v1.getDefaultTopic(); v2.d = v1.getDefaultTopicQueueNums(); v2.e = v1.getQueueId(); v2.f = v1.getSysFlag(); v2.g = v1.getBornTimestamp(); v2.h = v1.getFlag(); v2.i = v1.getProperties(); v2.j = v1.getReconsumeTimes(); v2.k = v1.isUnitMode(); v2.l = v1.getMaxReconsumeTimes(); v2.m = v1.isBatch(); return v2; }
备注: RemotingCommand类包含了传输过程当中全部数据的封装,还包括了编解码等操做(很是棒!!!解读为何这样,从面向对象角度,谁拥有数据谁就对外提供操做这些数据的方法,这句话应该是大学的时候学习面向对象的时候看张孝祥老师说的,一直记忆犹新,的确应该这么设计,rocketmq就这么作的,再次学习)。
上面的图已经作到很是清晰了,RemotingClient接口定义了client应该具有那些功能,RemotingSever相似,主要有:registerProcessor、invokeSync(同步调用)、invokeAsync(异步调用)、invokeOneway(单向调用)等等,而RemotingClient与RemotingSever在三种调用的区别就是参数有所区别。
NettyRemotingAbstract是Server与Client公用处理的抽象。
BrokerOuterAPI、MQClientImpl:都封装了NettyRemotingClient(后续介绍)。
不论是client仍是server经过RemotingService咱们明白,启动都是在start里面,咱们看看里面核心netty代码,以server里面代码为例:
备注:此处netty相关内容不进行深刻展开,只会把涉及的的简单说明,后续另开系列进行说明。
在进行tcp传输的时候常常会面临黏包/拆包问题,netty自带了不少通用的TCP黏包/拆包解决方案,下面咱们看看rocketmq如何借助netty来实现编解码:NettyEncoder编码、NettyDecoder解码,rocketmq相关的网络协议上面内容已经说明过了。
NettyEncoder编码
NettyDecoder解码
netty中针对这四种场景均有对应的解码器做为解决方案,好比:
rocketmq中使用的就是基于LengthFieldBasedFrameDecoder自定义长度解码器的。
IdleStateHandler:Netty自带的心跳检测。
NettyConnetManageHandle:主要就是连接管理,新链接、链接断开、异常、Idle等事件,每一个事件过来存入NettyEventExecuter的队列里面。
NettyEventExecutor的run方法会不断的从队列里面取事件进行相应的处理:
NettyServerHandler:具体业务处理(后续会说到)。
invokeSync(同步调用)进行说明:
opaque就至关与标识的这个请求,虽然rpc调用请求发送结束了,可是响应回来的时候仍是会带有该信息就能够判断出是原来那个请求,好比响应回来以后执行原来给定的回调等。
经过countDownLatch来控制等待网络通讯时间 :
invokeAsync(异步调用)进行说明:
与invokeSync(同步调用)基本相似,boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);//控制异步请求的个数以及超时和使用使用布尔原子变量,信号量保证只释放一次,对于异步invokeCallback不为空,须要进行调用。invokeOneway(单向调用)比较简单略过。
下面看看消息接收处理:
备注:这里判断是request仍是response都是经过header里面的flag标记来判断的,上面已经说明。
processResponseCommand在介绍上面三种发送的时候说过了,下面重点看看processRequestCommand:
备注:这里须要作流控,要求线程池对应的队列必须是有大小限制的,是经过线程池进行限流的。
RocketMQ原理介绍V3.1.1
netty源码分析之LengthFieldBasedFrameDecoder
若是读完以为有收获的话,欢迎点赞、关注、加公众号【匠心零度】,查阅更多精彩历史!!!
加入知识星球,一块儿探讨!