RocketMQ(八):消息发送

匠心零度 转载请注明原创出处,谢谢!git

RocketMQ网络部署图

  • NameServer:在系统中是作命名服务,更新和发现 broker服务。
  • Broker-Master:broker 消息主机服务器。
  • Broker-Slave: broker 消息从机服务器。
  • Producer: 消息生产者。
  • Consumer: 消息消费者。

说明: rocketmq系列都将会以rocketmq-4.1.0-incubating进行介绍。github

在阅读源码时作了必定的注释,公众号【匠心零度】回复:rocketmq,可得到基于rocketmq4.1.0加详细中文代码注释 。欢迎你们 star、fork !缓存

厮大说过消息中间件的本质消息中间件大道至简:一发一存一消费 ,今天主要来讨论下,就是RocketMQ网络部署图中用颜色标记的部分。bash

往期rocketmq系列文章

消息发送概述

上面的图大概就是producer发送message到broker的核心逻辑了。服务器

问题思考:网络

把broker相关信息缓存到客户端减小了与namesrv的交互,可是也下降了broker变化的实时性了,如何突然有一台broker不可用了会怎么样呢?(后续看看rocketmq的处理),为何producer发送会那么快呢?本质是因为netty的writeAndFlush?producer如何作到异步发送?同步发送?oneway发送的呢?若是发送失败会怎么处理呢?异步

消息发送通常流程分析

因为发送还涉及到定时发送,顺序发送,批量发送等状况,本篇考虑到篇幅问题就是通常的发送逻辑讲解,后面继续分享其余状况。ide

阅读本篇前应该重点阅读下:RocketMQ(二):RPC通信函数

如何在本地调试以前文章也分享过了,在此就不提了,发送的逻辑相对于存储以及消费来讲是最简单的(直接根据一条线不断的跟下去基本就差很少了),而存储最复杂,其次消费(这些过程可能一条线很差找,后续分享)。ui

同步发送写法

备注: 能够参考RocketMQ快速入门便可。

producer.start

/**
     * Start this producer instance.
     * </p>
     *
     * <strong>
     * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke * this method before sending or querying messages. * </strong> * </p> * * @throws MQClientException if there is any unexpected error. */ @Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); } 复制代码

主要作了下列事情(核心事情):

  • 一些配置检查。
  • 构建与namesrv通讯的netty客户端。
  • 默认每30s与namesrv交换获取broker相关信息。
  • 默认每30s去掉失效的broker信息以及发送心跳到全部broker上面。

构建Message对象

producer是以Message对象进行发送的,看看Message构造:

public Message() {
    }

    public Message(String topic, byte[] body) {
        this(topic, "", "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;

        if (tags != null && tags.length() > 0)
            this.setTags(tags);

        if (keys != null && keys.length() > 0)
            this.setKeys(keys);

        this.setWaitStoreMsgOK(waitStoreMsgOK);
    }

    public Message(String topic, String tags, byte[] body) {
        this(topic, tags, "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, byte[] body) {
        this(topic, tags, keys, 0, body, true);
    }
复制代码

备注: 主要就是topic、tags、以及body真实内容等。

send发送

SendResult sendResult = producer.send(msg);
复制代码

进行发送处理。下面咱们重点看看send如何处理。

发送send核心分析

发送的几种方式:同步 异步 oneway(应该选择哪一种,须要本身根据状况进行判断)

以同步发送为例子,默认超时时间为3s,

SendResult sendResult = producer.send(msg);
复制代码

这个就是发送的触发方法,咱们一直跟进去就好了,**第一初步感觉:**经过跟踪进去第一感受就是涉及到了JUC相关使用,大量运用享元模式(本质一个map进行缓存)以及netty使用。

核心逻辑:

代码就不大量复制了,须要的github里面获取基于rocketmq4.1.0加详细中文代码注释 。欢迎你们 star、fork !

  • 判断服务是否可用? 不可用直接结束流程。

  • 消息的验证:

  • 获取topic路由信息

    缓存中有就获取,没有就namesrv交互一次(也可能2次)因为topic信息在broker服务端不必定存在,若是不存在就用默认的(TBW102)。

封装请求头信息:

// Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列)
 public static final int GET_ROUTEINTO_BY_TOPIC = 105;
复制代码

namesrv服务端接受到这个请求的处理状况。

最后获得的路由信息相似下面的:

  • 发送模式是sync 会有3次其余1次

    //发送模式是sync 会有3次其余1次
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    复制代码
  • 选择一个queue

    如何选择发送那个broker的那个queueid上面?(客户端本身负载),因为broker相关信息缓存在客户端里面,问题来了(因为30s会同步一次信息,那么在30s以内broker出现问题会怎么样呢? )rocketmq是这样处理的:sendLatencyFaultEnable开关是否打开

    1.打开--> 有多长时间内不可用状况

    2.不打开(默认)-->直接随机一个(若是带了lastBrokerName不为空 尽可能换不是这个broker的,若是都没有又是随机一个)

  • 调用sendKernelImpl发送消息 发送消息核心

    根据broker的name获取到ip地址,若是通道没有创建而且保存。

    设置设置UNIQ_id,里面保护客户端ip地址信息。

    发送的时候 会有钩子函数提供执行(禁止消息钩子 ,发送消息钩子(executeSendMessageHookBefore、executeSendMessageHookAfter)。

    构建SendMessageRequestHeader,包括生成消息时间戳,因此各各机器时间最好一致,(这样后期也能够查下broker接受消息花了多少时间)。

  • 根据发送消息模式,选择发送方式

    下面此次主要看同步发送状况。

    若是1状况执行nettywriteAndFlush发送成功者跳出来,到达3状况进行等等最多等待3s。这里何时唤醒呢? 实际上是在broker状况响应客户端的时候进行唤醒的:

    备注: 这里使用CountDownLatch异步转同步的。

    若是是2状况表示发送失败,直接唤醒3状况不进行阻塞了(最后抛异常表示发送失败)

  • 更新broker可用时间

  • retryAnotherBrokerWhenNotStoreOK状况判断

    若是设置为retryAnotherBrokerWhenNotStoreOK为true以后,在发送失败的时候,会选择换一个broker。

  • 以下异常continue,进行发送消息重试

客户端发送流程大概到这里就分析完成了。


若是读完以为有收获的话,欢迎点赞、关注、加公众号【匠心零度】,查阅更多精彩历史!!!

加入知识星球,一块儿探讨!

相关文章
相关标签/搜索