1、概念html
1. 中间件:位于系统之间的服务服务器
2. 消息中间件:消息队列MQ,用于接收消息、存储消息、转发消息的中间件架构
3. Rocket MQ: 分布式的消息中间件,生产者、消费者、队列均可以分布式分布式
4. 基于Netty开发ide
2、RocketMQ使用spa
1. 在服务器上安装Rocket MQ.net
2. 启动rocket mq,即name server,启动以后监听端口,等待broker\producer\consumer链接code
3. 启动broker, 设置对应的name server,broker用于收取和存储消息server
4. 手动/自动建立Topichtm
5. 消费者代码
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //声明并初始化一个consumer //须要一个consumer group名字做为构造方法的参数,这里为consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); //一样也要设置NameServer地址 consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //这里设置的是一个consumer的消费策略 //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息 //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)所有消费一遍 //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时之前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置consumer所订阅的Topic和Tag,*表明所有的Tag consumer.subscribe("TopicTest", "*"); //设置一个Listener,主要进行消息的逻辑处理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); //返回消费状态 //CONSUME_SUCCESS 消费成功 //RECONSUME_LATER 消费失败,须要稍后从新消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //调用start()方法启动consumer consumer.start(); System.out.println("Consumer Started."); } }
6. 生产者代码
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //声明并初始化一个producer //须要一个producer group名字做为构造方法的参数,这里为producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); //设置NameServer地址,此处应改成实际NameServer地址,多个地址之间用;分隔 //NameServer的地址必须有,可是也能够经过环境变量的方式设置,不必定非得写死在代码里 producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //调用start()方法启动一个producer实例 producer.start(); //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //调用producer的send()方法发送消息 //这里调用的是同步的方式,因此会有返回结果 SendResult sendResult = producer.send(msg); //打印返回结果,能够看到消息发送的状态以及一些相关信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //发送完消息以后,调用shutdown()方法关闭producer producer.shutdown(); } }
3、架构和名词
1. NameServer:名称服务器,为Producer和Consumer提供路由信息,用于管理Broker节点信息,记录Broker与Topic的对应关系
2. ConsumerGroup:消费同一类消息的多个 consumer 实例组成一个消费者组
3. Topic:消息的逻辑分类,物理实现上,一个Topic由多个队列组成
4. Message:消息,指定topic,有消息内容
5. Tag:标签,是对Topic的进一步细化,能够用来过滤
6. Broker:消息服务器,就是MQ,分为Master和Slave节点,每一个Broker与全部NameServer集群中全部节点创建链接,定时注册Topic到全部的NameServer
7. Producer与一个NameServer创建长链接,按期从NameServer获取Topic信息,向Broker发送消息
8. Consumer与一个NameServer创建长链接,按期从NameServer获取Topic信息,从Broker消费消息
4、特性
1. 发布/订阅,点对点(P2P)
2. 消息优先级:Rocket MQ没有特地支持消息优先级,但能够配置优先级不一样的两个队列
3. 消息顺序:Rocket MQ严格保证消息顺序,先进先出
4. 消息过滤:生产端和消费端均可以过滤,各有优缺点
5. 消息持久化:Rocket MQ以文件形式持久化
6. 消息可靠性:避免消息丢失,须要生产者、消费者和MQ队列都保证
7. 消息延迟:Rocket MQ使用长轮询pull方式,保证明时
8. 消息堆积:由于须要削峰填谷,须要支持消息堆积,亿级别的消息堆积能力
9. 消息重试:消费失败后,从新再消费一次
10. 每一个消息必须投递一次
11. 不运行重复的消息,须要业务保证幂等
12. 队列大小,按期删除数据
13. 定时消息
14. 事务机制
5、消费者的消费模式
1. 集群消费:一条消息只会被投递到一个consumer group下面的一个实例
2. 广播消费:一条消息会被投递到一个consumer group下面的全部实例
6、消费者获取消息的模式
1. 推送模式:能及时消费
2. 拉取模式:能够主动控制拉取时机
7、Rocket MQ和其余消息队列比较(ActiveMQ, RabbitMQ, ZeroMQ, Kafka),为何选择Rocket MQ?
1. 严格的顺序消息
2. 亿级消息堆积能力
3. Pull/Push消费模式
4. 历经屡次天猫双十一海量消息考验
参考:
http://www.javashuo.com/article/p-ektqdoib-ke.html
http://www.javashuo.com/article/p-qfeldccx-kg.html
https://blog.csdn.net/tototuzuoquan/article/details/78325192