一文入门rocketmq(扫盲版-附示例demo)

一、什么是Rocketmq

消息队列 RocketMQ 是阿里巴巴集团自主研发的专业消息中间件,基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询以及定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。 消息队列 RocketMQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具有海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。
2012年开源,2017年成为apache顶级项目。java

二、名词解释

如下主要对消息队列 RocketMQ 涉及的专有名词及术语进行定义和解析。web

  • Topic
    消息主题,一级消息类型,经过 Topic 对消息进行分类。
  • Message
    消息,消息队列中信息传递的载体。
  • Message ID
    消息的全局惟一标识,由消息队列 RocketMQ 系统自动生成,惟一标识某条消息。
  • Message Key
    消息的业务标识,由消息生产者(Producer)设置,惟一标识某个业务逻辑。
  • Tag
    消息标签,二级消息类型,用来进一步区分某个 Topic 下的消息分类。
  • Producer
    消息生产者,也称为消息发布者,负责生产并发送消息。
  • Producer 实例
    Producer 的一个对象实例,不一样的 Producer 实例能够运行在不一样进程内或者不一样机器上。Producer 实例线程安全,可在同一进程内多线程之间共享。
  • Consumer
    消息消费者,也称为消息订阅者,负责接收并消费消息。
  • Consumer 实例
    Consumer 的一个对象实例,不一样的 Consumer 实例能够运行在不一样进程内或者不一样机器上。一个 Consumer 实例内配置线程池消费消息。
  • Group
    一类 Producer 或 Consumer,这类 Producer 或 Consumer 一般生产或消费同一类消息,且消息发布或订阅的逻辑一致。
  • Group ID
    Group 的标识。

特点功能:spring

  • 事务消息:实现相似 X/Open XA 的分布事务功能,以达到事务最终一致性状态。
  • 定时(延时)消息:容许消息生产者指定消息进行定时(延时)投递,最长支持 40 天。
  • 大消息:支持最大 4 MB 消息。
  • 消息轨迹:经过消息轨迹,能清晰定位消息从发布者发出,经由消息队列 RocketMQ 服务端,投递给消息订阅者的完整链路,方便定位排查问题。
  • 广播消费:容许同一个 Group ID 所标识的全部 Consumer 都各自消费某条消息一次。
  • 顺序消息:容许消息消费者按照消息发送的顺序对消息进行消费。
  • 重置消费进度:根据时间重置消费进度,容许用户进行消息回溯或者丢弃堆积消息。
  • 死信队列:将没法正常消费的消息储存到特殊的死信队列供后续处理。
  • 消息过滤:消费者能够根据消息标签(Tag)对消息进行过滤,确保消费者最终只接收被过滤后的消息类型。消息过滤在消息队列 RocketMQ 的服务端完成。

三、Rocketmq的概念模型

这三者是Rocketmq中最最基本的概念。Producer是消息的生产者;Consumer是消息的消费者。消息经过Topic进行传递。Topic存放的是消息的逻辑地址。具体来讲是Producer将消息发往具体的Topic。Consumer订阅Topic,主动拉取或被动接收消息。
实际上,topic还须要拆分出更多概念。系统部署架构以下图所示:sql

(1)NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步
(2) Broker部署相对复杂,Broker氛围Master与Slave,一个Master能够对应多个Slaver,可是一个Slaver只能对应一个Master,Master与Slaver的对应关系经过指定相同的BrokerName,不一样的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master能够部署多个。每一个Broker与NameServer集群中的全部节点创建长链接,定时注册Topic信息到全部的NameServer
(3)Producer与NameServer集群中的其中一个节点(随机选择)创建长链接,按期从NameServer取Topic路由信息,并向提供Topic服务的Master创建长链接,且定时向Master发送心跳。Produce彻底无状态,可集群部署
(4)Consumer与NameServer集群中的其中一个节点(随机选择)创建长链接,按期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slaver创建长链接,且定时向Master、Slaver发送心跳。Consumer便可从Master订阅消息,也能够从Slave订阅消息,订阅规则由Broker配置决定数据库

四、Rocketmq储存特色

RocketMQ的消息存储是由consume queue和commit log配合完成的。apache

4.一、Commit Log

这个是真正存储消息的地方。RocketMQ全部生产者的消息都是往这一个地方存的,每台broker上的commitlog被本机全部的queue共享,不作任何区分。安全

4.二、Consume Queue

这是一个逻辑队列。和上文中Topic下的messageQueue是一一对应的。消费者是直接和ConsumeQueue打交道。ConsumeQueue记录了下·消费位点,这个消费位点关联了commitlog的位置。因此即便ConsumeQueue出问题,只要commitlog还在,消息就没丢,能够恢复出来。还能够经过修改消费位点来重放或跳过一些消息,多线程

五、基础应用示例demo

Spring整合rocketmq
Pom依赖架构

<!-- rocketmq依赖包 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>4.4.0</version>
            <type>pom</type>
        </dependency>

生产者并发

Xml链接配置

<!-- 这是生产者 -->
	<bean id="defaultMQProducer" class="org.apache.rocketmq.client.producer.DefaultMQProducer" init-method="start" destroy-method="shutdown">
		<property name="producerGroup" value="ProducerGroud1" />
		<property name="namesrvAddr" value="127.0.0.1:9876" />
		<!-- 消息发送失败重试次数,默认为2,可能会形成消息重复 -->
		<property name="retryTimesWhenSendFailed" value="2"></property>
		<!-- 消息没有持久化成功是否发送到另一个broker,默认为false -->
		<property name="retryAnotherBrokerWhenNotStoreOK" value="true"></property>
	</bean>

RocketMQ 提供了三种方式发送消息:同步、异步和单向
•同步发送 同步发送指生产者发出数据后会在收到broker的response以后才发下一个数据包。

@Autowired
private DefaultMQProducer defaultMQProducer;
    /* * 同步可靠传输,多用于重要的消息提醒、短信提醒、短信营销系统等 send方法被设置为CommunicationMode.SYNC * 同步发送表示,producer发送消息以后不会当即返回,会等待broker的response */
 @RequestMapping("/sendMessagesSynchronously")
    public String sendMessagesSynchronously() throws Exception { 
 
  
        for (int i = 0; i < 1; i++) { 
 
  
            // 建立一个消息实例, 指定主题、标签、消息体
            Message msg = new Message("Topict", "TagA", ("Hello RocketMQ " + i).getBytes());
            // 把消息发送到brokers中的一个broker
            SendResult sendResult = defaultMQProducer.send(msg);
            System.out.println(sendResult);
        }
        return "同步发送成功";
    }

•异步发送 异步发送指生产者发出数据后,不等broker的response,接着发送下个数据包。

/* * 异步可靠传输,多用于对响应时间敏感的业务场景中 send方法被设置为CommunicationMode.ASYNC */
    @RequestMapping("/sendMessagesAsynchronously")
    public String sendMessagesAsynchronously() throws Exception { 
 
  
        for (int i = 0; i < 2; i++) { 
 
  
            final int index = i;
            // 建立一个消息实例, 指定主题、标签、消息体.
            Message msg = new Message("Topic2", "TagA", "OrderID1", "Hello world".getBytes());
            defaultMQProducer2.send(msg, new SendCallback()
            { 
 
  
                @Override
                public void onSuccess(SendResult sendResult) { 
 
  
                    System.out.println(index + " " + sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) { 
 
  
                    System.out.println(index + " " + "error");
                    e.printStackTrace();
                }
            });
        }
        return "异步发送成功";
    }

•单向发送 单向发送是指只负责发送消息而不等待roker的response且没有回调函数触发。

/* * 单向传输,多用在可靠稳定的业务中,例如日志收集 */
    @RequestMapping("/sendMessagesinOnewayMode")
    public String sendMessagesinOnewayMode() throws Exception { 
 
  
        for (int i = 0; i < 2; i++) { 
 
  
            // 建立一个消息实例, 指定主题、标签、消息体.
            Message msg = new Message("Topic3", "TagA", ("Hello RocketMQ " + i).getBytes());
            // 把消息发送到brokers中的一个broker
            defaultMQProducer.sendOneway(msg);
        }
        return "单向发送成功";
}

消费者
定义消费者,设置消费者组、NameServer的地址等信息,RocketMQ 提供了两种消费模式, PUSH 和 PULL,大多数场景使用的是PUSH模式,这两种模式分别对应的是 DefaultMQPushConsumer 类和DefaultMQPullConsumer 类,PUSH 模式实际上在内部仍是使用的 PULL 方式实现的,经过 PULL 不断地轮询 Broker 获取消息,在Push模式下,Consumer把轮询过程封装了,并注册了MessageListener监听器,取到消息后,唤醒MessageListener监听器中的consumeMessage()进行消费。

<!-- 配置监听者 -->
	<bean id="registerMessageListener" class="com.epoint.rocketmq.spring.Consumer"></bean>
	<!-- 配置消费者 -->
 	<bean id="PushConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown">
		<property name="consumerGroup" value="CID_1" />
		<property name="namesrvAddr" value="127.0.0.1:9876" />
		<property name="messageListener" ref="registerMessageListener" />
		<property name="subscription">
			<map>
				<!-- topic主题 -->
				<entry key="Topic">
					<!-- 订阅TAG,消费者将收到TAGA或TAGB或TAGC的消息,TAG的设计基本已知足大部分需求,对于复杂的案例,可使用SQL92表达式过滤消息 可参考官方文档https://rocketmq.apache.org/docs/filter-by-sql92-example/ -->
					<!-- <value>TAGA || TAGB || TAGC</value> -->
					<value>*</value>
				</entry>
			</map>
		</property> 
	</bean>

对于消费重试机制,能够选择以下展现的两种之一 默认消费重试16次,若均失败则会进入死信队列.

若但愿手动干预,能够选择重试三次,再也不重试,返回成功,将消息另外保存(consumeMessageBatchMaxSize需为默认的1,其实不建议修改次参数)。

顺序消费是要前者消费成功才能继续消费,因此没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其他消费,直到原消息不断重试成功为止才能继续消费。

public class Consumer implements MessageListenerConcurrently
{ 
 
  
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
           ConsumeConcurrentlyContext consumeConcurrentlyContext) { 
 
  

       // 消费消息
       for (MessageExt me : list) { 
 
  
           System.out.print("msg=" + new String(me.getBody()) + "\n");
       }
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
}

六、总结

分布式事务,本质上是对多个数据库的事务进行统一控制,按照控制力度能够分为:不控制、部分控制和彻底控制。不控制就是不引入分布式事务,部分控制就是各类变种的两阶段提交,包括上面提到的消息事务+最终一致性、TCC模式,而彻底控制就是彻底实现两阶段提交。部分控制的好处是并发量和性能很好,缺点是数据一致性减弱了,彻底控制则是牺牲了性能,保障了一致性,具体用哪一种方式,最终仍是取决于业务场景。

七、后续

在RocketMQ4.3.0版本后,开放了事务消息这一特性,对于分布式事务而言,最常说的仍是二阶段提交协议,下一篇将详细叙述如何利用RocketMQ实现分布式事务消息。