这里先从总体的执行请求流程进行梳理,而后针对broker的实现细节再作深刻优化网络
RocketMQ的总体事件操做步骤为:函数
producer -> netty -> broker -> netty -> consumer性能
先从总体的一张大图上看,而后作摘要说明优化
备注:较大的长方形为类,较小的长方形为方法,虚线为网络通讯,只作了大致的事件流程,没有作到详细,只是提供一个总体的操做this
事件说明:RocketMQ的事件描述,主要是客户端基于不一样的请求,服务端基于特定的协议编码进行特定的逻辑处理编码
producer的事件:MQClientAPIImpl类中的sendMessage方法中,具体实现为netty
//协议转换 if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); }
consumer的事件:MQClientAPIImpl类中的pullMessage方法中,具体实现为blog
//拉取的请求协议转换 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
broker的注册事件:主要操做在BrokerController类中,分为两类事件生命周期
构造函数中初始化拉取事件事件
//pull消息的处理 this.pullMessageProcessor = new PullMessageProcessor(this); //pull消息的服务 this.pullRequestHoldService = new PullRequestHoldService(this);
初始化中构造其余事件
/** * SendMessageProcessor */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); /** * PullMessageProcessor */ this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); /** * QueryMessageProcessor */ NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); /** * ClientManageProcessor */ ClientManageProcessor clientProcessor = new ClientManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); /** * ConsumerManageProcessor */ ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); /** * EndTransactionProcessor */ this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); /** * Default */ AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
broker是RocketMQ的核心,不少重要的功能和高性能的技术体现,总体概述为:
1,接收producer的发送请求
2,处理consumer的消息拉取
3,持久化消息内容
4,配置的消息的高可用
5,服务端的过滤引擎执行
后面咱们根据broker的生命周期的执行,详细的介绍上面的5大方面操做