为了实现分布式系统可扩展、可伸缩性的关键组件,须要具备高吞吐量、高可用等特色。咱们不少时候都会考虑将消息系统归入咱们的选择中;好比我一个登陆事件,有可能我登陆以后须要作不少东西,好比日志,好比发布消息,好比推送,再好比发送代金券等等;这些事件与登陆息息相关,可是本质上它们与登陆这个事件没有直接的关系,只是在登陆事件后,系统按照需求须要去初始化一些东西,或者去记录一些东西等等;若是把全部的东西都归入到登陆这个事件中(同一个事物中),那登陆的事件内处理的逻辑更多,会形成什么后果?登陆时间很长,让用户没法忍受,另外,假如登陆过程当中出现了未发现异常,那是否是致使用户直接没法登陆?为了解决这样的问题,咱们引入了消息系统,好比我这台机登陆事后,我将登陆的一些信息,经过远程方式发送到另一台机器上(或者同一台机),让它们去处理相应的后续逻辑实现;html
目的是:一、用户登陆更快,体验上更好,java
二、只要保证登陆部分完整,即使后续出错,并不影响用户正常使用,即容错性更强!算法
谈到消息系统,首先想到的第一个问题确定会是:负载均衡
消息的顺序性dom
原本很想说一下关于消息顺序性的一些问题,不过因为我也是借鉴了一些其余的帖子,以及官方的文档,因此这里就不会去赘述这些了,稍后我会分享一些很不错的连接,留给本身之后看,也但愿能够给一些恰好要入门rocketmq的网友提供一些资料;分布式
rocketmq是阿里云的一套开源产品,功能什么的就不赘述了,请自行去网站了解:https://help.aliyun.com/document_detail/29532.html?spm=5176.doc34411.6.104.EvZr21ide
rocketmq是一类二级消息分类的产品,一级为topic,二级为tag;网站
broker按照收到生产者发送的消息体,分析其中的topic,而后去找到相应的topic转发出去,在消费端,消费者根据收到的消息分析出tag的不一样去作不一样的逻辑处理;this
那么在这个时候,咱们就会好奇,为了保证消息的顺序执行的状况,RokectMQ是如何选择topic?为此,咱们先看看rokcetmq的源代码:阿里云
// 官方例子以下: public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 10000000; i++) try { { Message msg = new Message("TopicTest",// topic "TagA",// tag "OrderID188",// key ("Hello MetaQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// body SendResult sendResult = producer.send(msg); //发送消息 System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } } // defalutMQProducer 类下封装的方法 @Override public SendResult send(Message msg, MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, mq); } // send的实现方法 @Override public SendResult send(Message message) { this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl()); com.alibaba.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message); try { com.alibaba.rocketmq.client.producer.SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ); message.setMsgID(sendResultRMQ.getMsgId()); SendResult sendResult = new SendResult(); sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());//如何选择topic sendResult.setMessageId(sendResultRMQ.getMsgId()); return sendResult; } catch (Exception e) { log.error(String.format("Send message Exception, %s", message), e); throw checkProducerException(message.getTopic(), message.getMsgID(), e); } }
在官方例子中,咱们能够看到,在发送消息的时候,咱们并无去了解细致的发送消息时,那么MQ究竟是如何选择topic的?
可是能够从代码中看到,它确实有个MessageQueueSelector 接口,这个接口负责是选择topic,那么咱们就来看看它到底为咱们提供了那些实现方法吧(通常的消息都是轮询去寻找topic来实现负载均衡):
/** * 若是lastBrokerName不为null,则寻找与其不一样的MessageQueue(轮询负载均衡) */ public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName != null) { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size();//轮询 MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return null; } else { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); return this.messageQueueList.get(pos); } }
若是对于这种轮询方式的负载均衡不满意,并不能打达到咱们的需求,那么咱们又改如何去选择?
阿里云提供了三种方式来解决咱们的需求,若是再不能知足,那么就知道修改源码算法部分来达到本身的要求了。
/** * 使用哈希算法来选择队列,顺序消息一般都这样作<br> * * @author shijia.wxr<vintage.wang@gmail.com> * @since 2013-6-27 */ public class SelectMessageQueueByHash implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); } value = value % mqs.size(); return mqs.get(value); } } /** * 根据机房来选择发往哪一个队列,支付宝逻辑机房使用 * * @author shijia.wxr<vintage.wang@gmail.com> * @since 2013-7-25 */ public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { private Set<String> consumeridcs; @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // TODO Auto-generated method stub return null; } public Set<String> getConsumeridcs() { return consumeridcs; } public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; } } /** * 发送消息,随机选择队列 * * @author shijia.wxr<vintage.wang@gmail.com> * @since 2013-7-25 */ public class SelectMessageQueueByRandoom implements MessageQueueSelector { private Random random = new Random(System.currentTimeMillis()); @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = random.nextInt(); if (value < 0) { value = Math.abs(value); } value = value % mqs.size(); return mqs.get(value); } }
能够看到,rocketmq为咱们提供了三个选择(除去轮询方法),那么若是咱们在很是关注消息顺序的时候,咱们能够选择经过哈希算法求值的方式来实现
SelectMessageQueueByHash
咱们每一个传递进入的对象都会被哈希算法计算出 一个哈希值,好比咱们传递的是订单号,那么无疑咱们能够保证相同的订单号能够传递给相同的topic去处理,那么只要再保证是一致的tag就能够保证顺序的一致性啦;
目的是:生产者 -- MQ服务端 -- 消费者 能够达到一一对应的关系
第二种是机房选择,算法是木有啦,应该是根据ip地址去区分,反正概念我不是很清晰,也没有去注意和了解;有了解的亲留个资料给我吧,连接就好,谢谢撒……
第三种是随机选择,也就是谁也不知道它到底会选择谁,这种效率其实不好,没有负载均衡,谁也不知道会不会堵塞起来,谁也不知道某个队列是否已经塞满。
有些问题,看起来很重要,但实际上咱们能够经过合理的设计或者将问题分解来规避。若是硬要把时间花在解决它们身上,其实是浪费的,效率低下的。从这个角度来看消息的顺序问题,咱们能够得出两个结论:
一、不关注乱序的应用实际大量存在
二、队列无序并不意味着消息无序
参考连接以下:
http://www.jianshu.com/p/453c6e7ff81c
http://blog.csdn.net/column/details/learningrocketmq.html?&page=1
路走多了,早晚有人会翻车……