1、专业术语java
消费生产者,负责产生消息,通常由业务系统负责产生消息
服务器
消息消费者,负责消费消息,通常是后台系统负责异步消费网络
Consumer的一种,应用一般向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象马上回调Listener接口方法数据结构
Consumer的一种,应用一般主动调用Consumer的拉消息方法,从Broker拉消息,主动权由应用控制异步
一类Producer的集合名称,这类Consumer一般发送一类消息,且发送逻辑一致。分布式
一类Consumer的集合名称,这类Consumer一般消费一类消息,且消费逻辑一致。ide
消息中转角色,负责存储消息,转发消息,通常也称为Server。在JMS规范中称为Provider。测试
一个消息被多个Consumer消费,即便这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每一个Consumer都消费一 次,广播消费中的Consumer Group概念能够认为在消息划分方面无心义ui
在CORBA Notification 规范中,消费方式都属于广播消费。spa
在JMS规范中,至关于JMS publish/subscribe model
一个Consumer Group 中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个Consumer Group有3个实例(多是3个进程,或者3台机器),那么每一个实例只消费其中的3条消息。
消费消息的顺序要同发送消息的顺序一致,在RocketMq中,主要指的是局部顺序,即一类消息为知足顺序性,必须Producer单线程顺序发送,且发送到同一队列,这样Consumer就能够按照Producer发送的顺序去消费消息。
顺序消息的一种,正常状况下能够保证彻底的顺序消息,可是一旦发生通讯异常,Broker重启,因为队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。
若是业务能容忍在集群一次状况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
顺序消费的一种,不管正常异常状况都能保证顺序,可是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大下降。
若是服务器部署为同步双写模式,此缺陷可经过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用(依赖同步双写,主备自动切换,自动切换功能还没有实现)
在RocketMq中,全部消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每一个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,因此任务是长度无限,另外队列中只保存最近几天的数据,以前的数据会按照过时时间来删除。
2、代码示例
生产者:
package com.alibaba.rocketmq.example.quickstart; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * @author : Jixiaohu * @Date : 2018-04-19. * @Time : 9:20. * @Description : */ public class produce { public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("jxh_quickstart_produce"); producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); producer.start(); try { for (int i = 0; i < 100; i++) { Message msg = new Message("TopicQuickStart", "TagA", ("Hello RoctetMq : " +i ).getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (RemotingException e) { e.printStackTrace(); Thread.sleep(1000); } producer.shutdown(); } }
生产者生产100条消息:
消费者:
package com.alibaba.rocketmq.example.quickstart; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * @author : Jixiaohu * @Date : 2018-04-19. * @Time : 9:20. * @Description : */ public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jxh_quickstart_produce"); consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费仍是队列尾部开始消费<br> * 若是非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicQuickStart", "*"); //不配置默认一条 consumer.setConsumeMessageBatchMaxSize(10); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println("拉取消息条数 " + msgs.size()); try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started"); } }
这边设置了每次消费条数,我这边先启动Consumer订阅ropic,而后启动produce,看一下打印结果:
produce发送了100条数据,下面看一下,Consumer消费数据的状况
你会发现,为何每次获取的消息都是1条,那上面设置的每次获取最大的消息数目“10”,是否是不起做用了?
实际上是这样的,
咱们正常的流程通常都是先启动Consumer端,而后再启动Producer端。Consumer端都是一条一条的消费的。可是有时候会出现先启动Producer端,而后再启动Consumer端这种状况,那这个时候就是会批量消费了,这个参数就会有做用了。
3、消息的重试,
消息的重试大体分为三种状况,
①:Produce发送消息到MQ上,发送失败。
看一下produce的代码是怎么实现的,这边看一个大概的状况
public class produce { public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("jxh_quickstart_produce"); //消息发送至mq失败后的重试次数 producer.setRetryTimesWhenSendFailed(10); producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); producer.start(); try { for (int i = 0; i < 100; i++) { Message msg = new Message("TopicQuickStart", "TagA", ("Hello RoctetMq : " + i).getBytes()); // SendResult sendResult = producer.send(msg); //增长一个超时参数,单位为毫秒 SendResult sendResult = producer.send(msg, 1000); System.out.println(sendResult); } } catch (RemotingException e) { e.printStackTrace(); Thread.sleep(1000); } producer.shutdown(); } }
②MQ推送消息至Consumer超时失败(网络波动)timeout。
这种状况,timeout,MQ会无限循环,直到把消息推送至Consumer,MQ没有接收到RECONSUME_LATER或CONSUME_SUCCESS
③Consumer处理消息后,返回RECONSUME_LATER,MQ也会按照策略发送消息 exception。
消息重试的测试是 1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h
RocketMQ为咱们提供了这么屡次数的失败重试,可是在实际中也许咱们并不须要这么多重试,好比重试3次,尚未成功,咱们但愿把这条消息存储起来并采用另外一种方式处理,并且但愿RocketMQ不要再重试呢,由于重试解决不了问题了!这该如何作呢?
public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jxh_quickstart_produce"); consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费仍是队列尾部开始消费<br> * 若是非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicQuickStart", "*"); //不配置默认一条 consumer.setConsumeMessageBatchMaxSize(10); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println("拉取消息条数 " + msgs.size()); try { // for (MessageExt msg : msgs) { MessageExt msg = msgs.get(0); String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags + "msgs:" + msgs); //注意,要先启动Consumer,在进行发送消息(也就是先订阅服务,再发送) if ("Hello RoctetMq : 4".equals(msgBody)) { System.out.println("===========失败消息开始========"); System.out.println(msgBody); System.out.println("===========失败消息结束========"); //异常 int a = 1 / 0; } // } } catch (Exception e) { e.printStackTrace(); if (msgs.get(0).getReconsumeTimes() == 3) { // 该条消息能够存储到DB或者LOG日志中,或其余处理方式 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started"); } }
查看下打印结果:
这边能看到重试次数。