broker启动apache
进入这个方法org.apache.rocketmq.broker.BrokerStartup#startapp
这一行socket
// broker控制器启动 controller.start();
进入这个方法org.apache.rocketmq.broker.BrokerController#startide
public void start() throws Exception { if (this.messageStore != null) { // 消磁存储服务启动 this.messageStore.start(); } if (this.remotingServer != null) { this.remotingServer.start(); } if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); } if (this.fileWatchService != null) { this.fileWatchService.start(); } if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); } if (this.filterServerManager != null) { this.filterServerManager.start(); } // 注册broker this.registerBrokerAll(true, false, true); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); } if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); } if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) { if (this.transactionalMessageCheckService != null) { log.info("Start transaction service!"); this.transactionalMessageCheckService.start(); } } }
进入这个方法org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#run函数
public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); this.waitForRunning(interval); this.doFlush(1); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } this.doFlush(RETRY_TIMES_OVER); DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
进入这个方法org.apache.rocketmq.store.ConsumeQueue#flushoop
public boolean flush(final int flushLeastPages) { // 队列映射文件刷新 boolean result = this.mappedFileQueue.flush(flushLeastPages); if (isExtReadEnable()) { result = result & this.consumeQueueExt.flush(flushLeastPages); } return result; }
这一行this
// 队列映射文件刷新 boolean result = this.mappedFileQueue.flush(flushLeastPages);
进入这个方法spa
public boolean flush(final int flushLeastPages) { boolean result = true; // 根据offset找到映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
进入这个方法org.apache.rocketmq.store.MappedFileQueue#flush.net
public boolean flush(final int flushLeastPages) { boolean result = true; // 根据offset找到映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
返回到这个方法这一行线程
org.apache.rocketmq.store.DefaultMessageStore#start
// 提交日志 this.commitLog.start();
进入这个方法org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
@Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 从配置文件中获取配置文件 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } }
返回到这个方法这一行
org.apache.rocketmq.store.DefaultMessageStore#start
// 存储状态服务启动 this.storeStatsService.start();
进入这个方法org.apache.rocketmq.store.StoreStatsService#run
返回到这个方法org.apache.rocketmq.store.DefaultMessageStore#start
这一行
// 存储消息服务启动 this.reputMessageService.start();
进入这个方法org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
@Override public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
进入这个方法org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { // 存储服务转发请求 DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } }
返回到这个方法org.apache.rocketmq.store.DefaultMessageStore#start这一行
// ha服务启动 this.haService.start();
public void start() throws Exception { lock = lockFile.getChannel().tryLock(0, 1, false); if (lock == null || lock.isShared() || !lock.isValid()) { throw new RuntimeException("Lock failed,MQ already started"); } lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); lockFile.getChannel().force(true); // 刷新消费队列启动 this.flushConsumeQueueService.start(); // 提交日志 this.commitLog.start(); // 存储状态服务启动 this.storeStatsService.start(); if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { this.scheduleMessageService.start(); } if (this.getMessageStoreConfig().isDuplicationEnable()) { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); } else { this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); } // 存储消息服务启动 this.reputMessageService.start(); // ha服务启动 this.haService.start(); this.createTempFile(); this.addScheduleTask(); this.shutdown = false; }
进入这个方法org.apache.rocketmq.store.ha.HAService#start
进入这个方法org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); Set<SelectionKey> selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = new HAConnection(HAService.this, sc); // ha connection启动 conn.start(); HAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } else { log.warn("Unexpected ops in select " + k.readyOps()); } } selected.clear(); } } catch (Exception e) { log.error(this.getServiceName() + " service has exception.", e); } } log.info(this.getServiceName() + " service end"); }
ha connection启动 conn.start();
public void start() { this.readSocketService.start(); this.writeSocketService.start(); }
进入这个方法org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run
@Override public void run() { HAConnection.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); // 处理读事件 boolean ok = this.processReadEvent(); if (!ok) { HAConnection.log.error("processReadEvent error"); break; } long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval); break; } } catch (Exception e) { HAConnection.log.error(this.getServiceName() + " service has exception.", e); break; } } this.makeStop(); writeSocketService.makeStop(); haService.removeConnection(HAConnection.this); HAConnection.this.haService.getConnectionCount().decrementAndGet(); SelectionKey sk = this.socketChannel.keyFor(this.selector); if (sk != null) { sk.cancel(); } try { this.selector.close(); this.socketChannel.close(); } catch (IOException e) { HAConnection.log.error("", e); } HAConnection.log.info(this.getServiceName() + " service end"); }
这一行
处理读事件 boolean ok = this.processReadEvent();
进入这个方法
org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run
返回到这个方法org.apache.rocketmq.store.ha.HAService#start这一行
this.groupTransferService.start();
这一行
this.haClient.start();
进入这个方法org.apache.rocketmq.store.ha.HAService.HAClient#run
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 链接master if (this.connectMaster()) { if (this.isTimeToReportOffset()) { boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { // 关闭master this.closeMaster(); } } this.selector.select(1000); // 处理读事件 boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } if (!reportSlaveMaxOffsetPlus()) { continue; } long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval); this.closeMaster(); log.warn("HAClient, master not response some time, so close connection"); } } else { this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); this.waitForRunning(1000 * 5); } } log.info(this.getServiceName() + " service end"); }
返回到这个方法org.apache.rocketmq.broker.BrokerController#start
这一行
if (this.remotingServer != null) { this.remotingServer.start(); }
进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), // 自定义线程池中的线程名字 new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler netty server解析
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: // 请求消息处理 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: // 响应消息处理 processResponseCommand(ctx, cmd); break; default: break; } } }
进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { // 公平的处理请求 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { // 客户自定义的钩子实现类 RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { // 这里mq提供了一些钩子方法能够扩展的地方,请求前处理逻辑能够在这里扩展 rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { // 执行rocketmq请求的后置处理钩子方法 rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } // 若是不是单线请求 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }
进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); // 从队列中获取responseFuture final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { // 若是响应对象有回调处理就处理回调函数 executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
返回到这个方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
// nett处理器启动 this.nettyEventExecutor.start();
进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.NettyEventExecutor#run
@Override public void run() { log.info(this.getServiceName() + " service started"); final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener(); while (!this.isStopped()) { try { // 事件保存在阻塞队列中 NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS); if (event != null && listener != null) { switch (event.getType()) { case IDLE: listener.onChannelIdle(event.getRemoteAddr(), event.getChannel()); break; case CLOSE: listener.onChannelClose(event.getRemoteAddr(), event.getChannel()); break; case CONNECT: listener.onChannelConnect(event.getRemoteAddr(), event.getChannel()); break; case EXCEPTION: listener.onChannelException(event.getRemoteAddr(), event.getChannel()); break; default: break; } } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info(this.getServiceName() + " service end"); }
说到最后
本次解析表明我的观点,仅供参考。
关注本号
加群讨论