rocketmq源码解析请求处理检查事务状态

说在前面apache

请求处理 检查事务状态bootstrap

 

源码解析缓存

进入这个方法,检查事务的状态,org.apache.rocketmq.client.impl.ClientRemotingProcessor.checkTransactionState(ChannelHandlerContext, RemotingCommand)微信

public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final CheckTransactionStateRequestHeader requestHeader =(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());final MessageExt messageExt = MessageDecoder.decode(byteBuffer);if (messageExt != null) {String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {messageExt.setTransactionId(transactionId);}final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) {//                按组选择producerMQProducerInner producer = this.mqClientFactory.selectProducer(group);if (producer != null) {final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());producer.checkTransactionState(addr, messageExt, requestHeader);} else {log.debug("checkTransactionState, pick producer by group[{}] failed", group);}} else {log.warn("checkTransactionState, pick producer group failed");}} else {log.warn("checkTransactionState, decode message failed");}return null;    }

进入这个方法, 按组选择producer,org.apache.rocketmq.client.impl.factory.MQClientInstance.selectProducer(String)app

public MQProducerInner selectProducer(final String group) {return this.producerTable.get(group);org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader)    }

往上返回到这个方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader)async






@Overridepublic void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {Runnable request = new Runnable() {private final String brokerAddr = addr;private final MessageExt message = msg;private final CheckTransactionStateRequestHeader checkRequestHeader = header;private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();@Overridepublic void run() {//                获取事务监听器TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();if (transactionCheckListener != null) {LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable exception = null;try {//                        检查本地事务状态=》localTransactionState = transactionCheckListener.checkLocalTransaction(message);} catch (Throwable e) {log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);exception = e;}//                    处理事务状态=》this.processTransactionState(localTransactionState,group,exception);} else {log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);}}//private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}try {//                    单向结束事务=》DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error("endTransactionOneway exception", e);}}};this.checkExecutor.submit(request);    }

进入这个方法,获取事务监听器,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkListener()ide

@Overridepublic TransactionListener checkListener() {if (this.defaultMQProducer instanceof TransactionMQProducer) {TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;return producer.getTransactionListener();}return null;    }

进入这个方法,检查本地事务状态,org.apache.rocketmq.example.transaction.TransactionListenerImpl.checkLocalTransaction(MessageExt)this

@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;    }

往上返回到这个方法,处理事务状态,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkTransactionState(...).new Runnable() {...}.processTransactionState(LocalTransactionState, String, Throwable).net




private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}try {//                    单向结束事务=》DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error("endTransactionOneway exception", e);}}};this.checkExecutor.submit(request);    }

往上返回到这个方法,单向结束事务,org.apache.rocketmq.client.impl.MQClientAPIImpl.endTransactionOneway(String, EndTransactionRequestHeader, String, long)debug

public void endTransactionOneway(final String addr,final EndTransactionRequestHeader requestHeader,final String remark,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);request.setRemark(remark);//        单途请求=》this.remotingClient.invokeOneway(addr, request, timeoutMillis);    }

进入这个方法,单途请求,org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeOneway(String, RemotingCommand, long)

@Overridepublic void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {//        获取channel=》final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {if (this.rpcHook != null) {//                    执行请求执行前的钩子方法this.rpcHook.doBeforeRequest(addr, request);}//               执行单线请求 =》this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);//                异常关闭channel=》this.closeChannel(addr, channel);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}    }

进入这个方法, 获取channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(String)


private Channel getAndCreateChannel(final String addr) throws InterruptedException {if (null == addr) {//           获取和namesrv通讯的channel =》return getAndCreateNameserverChannel();}ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}//        根据地址建立channel=》return this.createChannel(addr);    }

进入这个方法,获取和namesrv通讯的channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel()




private Channel getAndCreateNameserverChannel() throws InterruptedException {String addr = this.namesrvAddrChoosed.get();if (addr != null) {ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}}//        从namesrvAddrChoosed中查找namesrv,若是不存在同步轮询的方式从namesrvAddrList中取final List<String> addrList = this.namesrvAddrList.get();if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {addr = this.namesrvAddrChoosed.get();if (addr != null) {ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}}if (addrList != null && !addrList.isEmpty()) {for (int i = 0; i < addrList.size(); i++) {int index = this.namesrvIndex.incrementAndGet();index = Math.abs(index);index = index % addrList.size();String newAddr = addrList.get(index);this.namesrvAddrChoosed.set(newAddr);log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);//                        同步建立渠道=》Channel channelNew = this.createChannel(newAddr);if (channelNew != null) {return channelNew;}}}} catch (Exception e) {log.error("getAndCreateNameserverChannel: create name server channel exception", e);} finally {this.lockNamesrvChannel.unlock();}} else {log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);}return null;    }

进入这个方法,同步建立渠道,org.apache.rocketmq.remoting.netty.NettyRemotingClient.createChannel(String)





private Channel createChannel(final String addr) throws InterruptedException {ChannelWrapper cw = this.channelTables.get(addr);//        代码走到这里,这里的逻辑正常状况下是走不到的,为了代码严谨性if (cw != null && cw.isOK()) {cw.getChannel().close();channelTables.remove(addr);}if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {boolean createNewConnection;//                为了代码严谨性,这里又作了一次判断cw = this.channelTables.get(addr);if (cw != null) {if (cw.isOK()) {cw.getChannel().close();this.channelTables.remove(addr);createNewConnection = true;//                        若是channel还在用,不让建立} else if (!cw.getChannelFuture().isDone()) {createNewConnection = false;} else {this.channelTables.remove(addr);createNewConnection = true;}} else {createNewConnection = true;}if (createNewConnection) {//                    从新创建链接ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);cw = new ChannelWrapper(channelFuture);//                    创建的channel也放到本次缓存中this.channelTables.put(addr, cw);}} catch (Exception e) {log.error("createChannel: create channel exception", e);} finally {this.lockChannelTables.unlock();}} else {log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);}if (cw != null) {ChannelFuture channelFuture = cw.getChannelFuture();if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {//                对channel再次判断if (cw.isOK()) {log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());return cw.getChannel();} else {log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());}} else {log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),channelFuture.toString());}}return null;    }

往上返回到这个方法, 根据地址建立channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient.createChannel(String)前面介绍过了。

 

往上返回到这个方法,执行单线请求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeOnewayImpl(Channel, RemotingCommand, long)前面介绍过了。

 

往上返回到这个方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor.checkTransactionState(ChannelHandlerContext, RemotingCommand)结束。

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索