RocketMQ是一款分布式、队列模型的消息中间件。
特征及实现原理:
特色:
1. 支持严格的消息顺序;
2. 支持Topic与Queue两种模式;
3. 支持事物;
4. 亿级消息堆积能力;
5. 比较友好的分布式特性;
6. 同时支持Push与Pull方式消费消息;
分布式消息系统做为实现分布式系统可扩展、可伸缩性的关键组件,须要具备高吞吐量、高可用等特色。而谈到消息系统的设计,就回避不了两个问题:
1. 消息的顺序问题
2. 消息的重复问题
世界上解决一个计算机问题最简单的方法:“刚好”不须要解决它!--沈询
解决这两个问题的方式刚好是不解决,业务端经过设计就能够规避掉这些问题。
1. 消息顺序:
通常消息是经过轮询全部队列来发送的(负载均衡策略),顺序消息能够根据业务,好比说订单号相同的消息发送到同一个队列。因此RocketMQ的消息顺序由Producer保证消息在一个队列上,Consumer保证消费同一个队列。
2. 重复消息
形成消息的重复的根本缘由是:网络不可达。只要经过网络交换数据,就没法避免这个问题。因此解决这个问题的办法就是不解决。消费端处理消息重复的问题便可。
a) 消费端处理消息的业务逻辑保持幂等性
b) 保证每条消息都有惟一编号且保证消息处理成功与去重表的日志同时出现
3. 事物消息
RocketMQ将事务拆分红小事务异步执行的方式来完成,消息发送者先发送消息,再执行本地事务,再确根据本地事物执行结果认消息发送成功或者消息回滚;若是消息发送成功,而消息确认发送失败,RocketMQ会按期扫描事务,并找发送者进行确认,由发送者肯定是回滚仍是继续发送,消息进入消息系统后,若是消息消费失败或者超时,则一直重发确保消息被消费,若是消息真的消费失败,RocketMQ一样刚好不解决,从而避免提升系统的复杂度。
物理结构:
1. Name Server。提供topic的路由信息,Producer和Consumer都会经过Name Server回去路由信息到Broker,路由信息数据存储在内存中,broker会定时发送路由信息给Name Server的每一台机器,来尽心更新,Name Server是无状态的,能够横向扩展。
2. Broker。消息的中转者,负责存储和转发消息。Broker分为master和slave,一个master能够对应多个slave,BrokerId=0表示master,其余的为slave,Broker长连到Name Server,定时发送Topic信息到全部Name Server。
3. Producer。Producer与Name Server集群中的其中一个节点(随机选择)创建长链接,按期从Name Server取Topic路由信息,并向提供Topic 服务的Master创建长链接,且定时向Master发送心跳。Producer 彻底无状态,可集群部署。
4. Consumer。Consumer与Name Server集群中的其中一个节点(随机选择)创建长链接,按期从Name Server取Topic 路由信息,并向提供Topic服务的Master、Slave创建长链接,且定时向Master、Slave发送心跳。Consumer既能够从Master订阅消息,也能够从Slave订阅消息,订阅规则由Broker配置决定。
工做方式:java
Producer向Topic的队列轮流发送消息,Consumer若是作广播消费,则一个consumer实例消费这个Topic对应的全部队列;若是作集群消费,则多个Consumer实例平均消费这个Topic对应的队列集合。
部署:
安装包:https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz
设计文档:http://alibaba.github.io/RocketMQ-docs/document/design/RocketMQ_design.pdf
配置Java环境:
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
启动Name Server: 到bin目录下 nohup mqnamesrv &
启动Broker: nohup mqbroker -n "192.168.1.111:9876" &
sh mqadmin topicList -n 192.168.1.111::9876 //查看topic列表
sh mqadmin topicStatus -n 192.168.1.111::9876 -t Topic1 //查看Topic1详情
sh mqadmin consumerProgress -n 192.168.1.111:9876 //查看消费组
broker配置:
在conf目录下有三个文件夹,对应broker配置,分别是2m-noslave, 2m-2s-async, 2m-2s-sync,分别对应2 master无slave配置,2 master 2 slave异步复制,2 master 2 slave同步双写。
使用默认配置:sh mqbroker -n “192.168.1.111:9876"
生成模板配置:sh mqbroker -m > broker.p
使用配置启动:sh mqbroker -c broker.p
使用:
Producer:git
DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("rmq-instance"); producer.setVipChannelEnabled(false); // 必须设为false不然链接broker10909端口 producer.start(); try { for (int i = 0; i < 3; i++) { Message msg = new Message("TopicA-test”, "TagA”, "OrderId" + i, ("Body" + i).getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); }
Consumer:github
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setVipChannelEnabled(false); // 必须设为false不然链接broker10909端口 consumer.setInstanceName("rmq-instance"); consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getKeys() + " " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started.");