消息队列 RocketMQ 是阿里巴巴集团自主研发的专业消息中间件,基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询以及定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。 消息队列 RocketMQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具有海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。
2012年开源,2017年成为apache顶级项目。java
如下主要对消息队列 RocketMQ 涉及的专有名词及术语进行定义和解析。web
特点功能:spring
这三者是Rocketmq中最最基本的概念。Producer是消息的生产者;Consumer是消息的消费者。消息经过Topic进行传递。Topic存放的是消息的逻辑地址。具体来讲是Producer将消息发往具体的Topic。Consumer订阅Topic,主动拉取或被动接收消息。
实际上,topic还须要拆分出更多概念。系统部署架构以下图所示:sql
(1)NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步
(2) Broker部署相对复杂,Broker氛围Master与Slave,一个Master能够对应多个Slaver,可是一个Slaver只能对应一个Master,Master与Slaver的对应关系经过指定相同的BrokerName,不一样的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master能够部署多个。每一个Broker与NameServer集群中的全部节点创建长链接,定时注册Topic信息到全部的NameServer
(3)Producer与NameServer集群中的其中一个节点(随机选择)创建长链接,按期从NameServer取Topic路由信息,并向提供Topic服务的Master创建长链接,且定时向Master发送心跳。Produce彻底无状态,可集群部署
(4)Consumer与NameServer集群中的其中一个节点(随机选择)创建长链接,按期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slaver创建长链接,且定时向Master、Slaver发送心跳。Consumer便可从Master订阅消息,也能够从Slave订阅消息,订阅规则由Broker配置决定数据库
RocketMQ的消息存储是由consume queue和commit log配合完成的。apache
这个是真正存储消息的地方。RocketMQ全部生产者的消息都是往这一个地方存的,每台broker上的commitlog被本机全部的queue共享,不作任何区分。安全
这是一个逻辑队列。和上文中Topic下的messageQueue是一一对应的。消费者是直接和ConsumeQueue打交道。ConsumeQueue记录了下·消费位点,这个消费位点关联了commitlog的位置。因此即便ConsumeQueue出问题,只要commitlog还在,消息就没丢,能够恢复出来。还能够经过修改消费位点来重放或跳过一些消息,多线程
Spring整合rocketmq
Pom依赖架构
<!-- rocketmq依赖包 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>4.4.0</version> <type>pom</type> </dependency>
生产者并发
Xml链接配置
<!-- 这是生产者 --> <bean id="defaultMQProducer" class="org.apache.rocketmq.client.producer.DefaultMQProducer" init-method="start" destroy-method="shutdown"> <property name="producerGroup" value="ProducerGroud1" /> <property name="namesrvAddr" value="127.0.0.1:9876" /> <!-- 消息发送失败重试次数,默认为2,可能会形成消息重复 --> <property name="retryTimesWhenSendFailed" value="2"></property> <!-- 消息没有持久化成功是否发送到另一个broker,默认为false --> <property name="retryAnotherBrokerWhenNotStoreOK" value="true"></property> </bean>
RocketMQ 提供了三种方式发送消息:同步、异步和单向
•同步发送 同步发送指生产者发出数据后会在收到broker的response以后才发下一个数据包。
@Autowired private DefaultMQProducer defaultMQProducer; /* * 同步可靠传输,多用于重要的消息提醒、短信提醒、短信营销系统等 send方法被设置为CommunicationMode.SYNC * 同步发送表示,producer发送消息以后不会当即返回,会等待broker的response */ @RequestMapping("/sendMessagesSynchronously") public String sendMessagesSynchronously() throws Exception { for (int i = 0; i < 1; i++) { // 建立一个消息实例, 指定主题、标签、消息体 Message msg = new Message("Topict", "TagA", ("Hello RocketMQ " + i).getBytes()); // 把消息发送到brokers中的一个broker SendResult sendResult = defaultMQProducer.send(msg); System.out.println(sendResult); } return "同步发送成功"; }
•异步发送 异步发送指生产者发出数据后,不等broker的response,接着发送下个数据包。
/* * 异步可靠传输,多用于对响应时间敏感的业务场景中 send方法被设置为CommunicationMode.ASYNC */ @RequestMapping("/sendMessagesAsynchronously") public String sendMessagesAsynchronously() throws Exception { for (int i = 0; i < 2; i++) { final int index = i; // 建立一个消息实例, 指定主题、标签、消息体. Message msg = new Message("Topic2", "TagA", "OrderID1", "Hello world".getBytes()); defaultMQProducer2.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(index + " " + sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.println(index + " " + "error"); e.printStackTrace(); } }); } return "异步发送成功"; }
•单向发送 单向发送是指只负责发送消息而不等待roker的response且没有回调函数触发。
/* * 单向传输,多用在可靠稳定的业务中,例如日志收集 */ @RequestMapping("/sendMessagesinOnewayMode") public String sendMessagesinOnewayMode() throws Exception { for (int i = 0; i < 2; i++) { // 建立一个消息实例, 指定主题、标签、消息体. Message msg = new Message("Topic3", "TagA", ("Hello RocketMQ " + i).getBytes()); // 把消息发送到brokers中的一个broker defaultMQProducer.sendOneway(msg); } return "单向发送成功"; }
消费者
定义消费者,设置消费者组、NameServer的地址等信息,RocketMQ 提供了两种消费模式, PUSH 和 PULL,大多数场景使用的是PUSH模式,这两种模式分别对应的是 DefaultMQPushConsumer 类和DefaultMQPullConsumer 类,PUSH 模式实际上在内部仍是使用的 PULL 方式实现的,经过 PULL 不断地轮询 Broker 获取消息,在Push模式下,Consumer把轮询过程封装了,并注册了MessageListener监听器,取到消息后,唤醒MessageListener监听器中的consumeMessage()进行消费。
<!-- 配置监听者 --> <bean id="registerMessageListener" class="com.epoint.rocketmq.spring.Consumer"></bean> <!-- 配置消费者 --> <bean id="PushConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown"> <property name="consumerGroup" value="CID_1" /> <property name="namesrvAddr" value="127.0.0.1:9876" /> <property name="messageListener" ref="registerMessageListener" /> <property name="subscription"> <map> <!-- topic主题 --> <entry key="Topic"> <!-- 订阅TAG,消费者将收到TAGA或TAGB或TAGC的消息,TAG的设计基本已知足大部分需求,对于复杂的案例,可使用SQL92表达式过滤消息 可参考官方文档https://rocketmq.apache.org/docs/filter-by-sql92-example/ --> <!-- <value>TAGA || TAGB || TAGC</value> --> <value>*</value> </entry> </map> </property> </bean>
对于消费重试机制,能够选择以下展现的两种之一 默认消费重试16次,若均失败则会进入死信队列.
若但愿手动干预,能够选择重试三次,再也不重试,返回成功,将消息另外保存(consumeMessageBatchMaxSize需为默认的1,其实不建议修改次参数)。
顺序消费是要前者消费成功才能继续消费,因此没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其他消费,直到原消息不断重试成功为止才能继续消费。
public class Consumer implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { // 消费消息 for (MessageExt me : list) { System.out.print("msg=" + new String(me.getBody()) + "\n"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
分布式事务,本质上是对多个数据库的事务进行统一控制,按照控制力度能够分为:不控制、部分控制和彻底控制。不控制就是不引入分布式事务,部分控制就是各类变种的两阶段提交,包括上面提到的消息事务+最终一致性、TCC模式,而彻底控制就是彻底实现两阶段提交。部分控制的好处是并发量和性能很好,缺点是数据一致性减弱了,彻底控制则是牺牲了性能,保障了一致性,具体用哪一种方式,最终仍是取决于业务场景。
在RocketMQ4.3.0版本后,开放了事务消息这一特性,对于分布式事务而言,最常说的仍是二阶段提交协议,下一篇将详细叙述如何利用RocketMQ实现分布式事务消息。