关于MQ,你必须知道的

OK那么消息队列MQ有什么套路呢?

  1. 使用消息队列场景和好处
  2. 使用消息队列会带来什么问题,有什么解决方案
  3. 如何使用MQ(以ActiveMQ为例的简单例子)

1.消息队列的应用场景和好处:web

  • 异步-流量削峰

  咱们先来看下传统的服务器接收处理请求的流程redis

  如上图,在不使用消息队列服务器的时候,用户的请求都直怼数据库,在高并发的状况下数据库压力剧增,不只使得响应速度变慢,还可能所以而挂掉数据库,致使用户页面直接报错,项目经理找上门,而后*#!%@!#** ......(PS:尽管是某服务挂了,但某宝的用户页面提示信息必定会甩锅给网络不通哦~)算法

  咱们再来看加入消息队列服务器以后的接收处理请求的流程会发生什么变化数据库

如上图,在使用消息队列以后,即便在高并发的状况下用户的请求数据发送给消息队列以后当即返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。因为消息队列服务器处理消息速度比数据库要快不少,所以响应速度(用户体验感) 获得大幅改善。apache

  所以咱们能够得出消息队列具备很好的流量削峰做用的功能——即经过异步处理,将短期高并发产生的事务消息存储在消息队列中,从而削去高峰期的并发事务。如在某些电商平台的一些秒杀活动中,合理使用消息队列能够抵御活动刚开始大量请求涌入对系统的冲击。 由于用户请求数据写入消息队列以后就当即返回给用户了,可是请求数据在后续的业务校验、写数据库等操做中可能失败。所以使用消息队列进行异步处理以后,须要适当修改业务流程进行配合,好比用户在提交订单以后,订单数据写入消息队列,不能当即返回用户订单提交成功,须要在消息队列的订单消费者进程真正处理完该订单以后,甚至出库后,再经过电子邮件或短信通知用户订单成功,以避免交易纠纷。这就相似咱们平时手机订火车票等。服务器

  • 异步-系统解耦

 我看也先来看看传统的系统数据传输模式网络

  如上图,主系统和其余系统的耦合性太强,都是直接调用,稍微有一点改动或者新增模块,双方都得改代码,过于麻烦session

  而后,咱们再来看看加入了消息队列以后,系统的结构会发生什么变化并发

  如上图,咱们知道若是模块之间不存在直接调用,那么新增模块或者修改模块就对其余模块影响较小,这样系统的可扩展性无疑更好一些。运维

消息队列使利用发布-订阅模式工做,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 从上图能够看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不须要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,便可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

 另外为了不消息队列服务器宕机形成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其余服务器发布消息。

 除发布订阅模式以外,消息队列还有其余的传输模式

  点对点模型

  基础模型中,只有一个发送者、一个接收者和一个分布式队列。

  生产者消费者模型

  若是发送者和接收者均可以有多个部署实例,甚至不一样的类型;可是共用同一个队列,这就变成了标准的生产者消费者模型。在该模型,三个角色通常称为生产(Producer)、分布式队列(Queue)、消费者(Consumer)。

    中途小结:消息队列对系统的并发处理的能力和扩展性有所提高

2.使用消息队列会带来什么问题:

  • 可用性下降: 在加入MQ以前,你不用考虑MQ服务器挂掉的状况,引入MQ以后你就须要去考虑了,可用性下降。
  • 复杂性提升: 加入MQ以后,你须要保证消息没有被重复消费、处理消息丢失的状况、保证消息传递的顺序性等问题。所以须要考虑的东西更多,系统复杂性增大。
  • 数据一致性: 消息队列带来的异步确实能够提升系统响应速度,可是,万一消息的真正消费者并无正确消费消息怎么办?这样就会致使数据不一致的状况了。

2.1有什么解决方案

  • 对于可用性问题

  引入消息队列后,系统的可用性降低。实际项目中发送MQ消息,若是不作集群,其中mq机器出了故障宕机了,那么mq消息就不能发送了,系统就崩溃了,因此咱们须要集群MQ,当其中一台MQ出了故障,其他的MQ机器能够接着继续运转,在生产中,没人使用单机的消息队列。若是有,那确定为了用而用(显得技术复杂一下,好忽悠多收点钱),对于这个问题,须要对MQ集群技术有比较深入的理解,各类消息中间件的集群方式不一样,下面以ActiveMq的集群为例(Zookeeper+ActiveMq),先看图

  这种应用场景叫作master/slave(主/备模式),在这种场景下,我有三台服务器(主和备),任何状况下,只有“主”在工做,“备”是在主出现故障时,接替“主”来提供服务。在zookeeper的支持下,这一过程是这样实现的,Zookeeper提供目录和节点的服务,当个人三台服务器启动时,会在zookeeper的指定目录下建立对应本身的临时节点(这个过程称为“注册”),所谓临时节点,是靠心跳(定时向zookeeper服务器发送数据包)维系,当主服务器出现故障(没法向zookeeper服务器发送数据包,zookeeper会删除改临时节点。服务器向zookeeper注册时,zookeeper会分配序列号,咱们认为序列号小的那个,就是“主”,序列号大的那个,就是“备”。

  当咱们的客户端(一般是web server)须要访问服务时,须要链接zookeeper,得到指定目录下的临时节点列表,也就是已经注册的服务器信息,得到序列号小的那台“主”服务器的地址,进行后续的访问操做。以达到“老是访问主服务器”的目的。当“主”服务器发生故障,zookeeper从指定目录下删除对应的临时节点,同时能够通知关心这一变化的全部客户端,高效且迅速的传播这一信息。当下个请求来的时候,仍是链接zookeeper,可是此时实际上是访问备用的MQ。

  对于如何配置集群,这里就不演示,自行网上搜教程,一大把的!

  • 对于复杂性问题
  1. 如何保证消息不被重复消费呢?

  要回答好这个问题,首先要知道为何消息会被重复消费,大多都是由于网络不通致使,确认信息没有传送到消息队列,致使消息队列不知道本身已经消费过该消息了,再次将该消息分发给其余的消费者。因此解决问题的方式有以下三种思路

①.若是消息是作数据库的插入操做,给这个消息作一个惟一主键,那么就算出现重复消费的状况,就会致使主键冲突,避免数据库出现脏数据。 ②.若是你拿到这个消息作redis的set的操做,不用解决,由于你不管set几回结果都是同样的,set操做原本就算幂等操做。 ③.若是上面两种状况还不行,准备一个第三服务方来作消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录便可。

    2.如何保证消息的可靠性传输呢?

其实这个问题是第一个问题的扩展,换而言之,咱们要保证可靠性传输,其实就是保证防止生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据而已

  其实这些问题早在中间件开发者已经考虑到了,也提供了一些可配置的文件给咱们自行设定相关参数,消息队列通常都会持久化到磁盘这个不用担忧,而后生产者数据丢失的话MQ的事务会回滚,能够尝试从新发送,消费者丢的的话通常都是采用了自动确认消息模式致使消费信息被删,只要修改成手动确认就好了,也就是说消费者消费完以后,调用一个MQ的确认方法就好了

  3.如何保证从消息队列里拿到的数据按顺序执行?

  经过算法,将须要保持前后顺序的消息放到同一个消息队列中,而后只用一个消费者去消费该队列。

  (1)rabbitmq:拆分多个queue,每一个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue可是对应一个consumer,而后这个consumer内部用内存队列作排队,而后分发给底层不一样的worker来处理

  (2)kafka:一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,而后N个线程分别消费一个内存queue便可

  4.如何解决消息队列的延时以及过时失效问题?有几百万消息持续积压几小时,怎么解决?

这个问题是生产环境出现事故后的,考察你如何快速的解决问题,,消息队列的延迟和过时失效是消息队列的自我保护机制,目的是为了防止自己被挤爆,固然是能够关闭保护,好比当某个消息消费失败5次后,就把这个消息丢弃等,尽可能不要关掉保护机制,那么问题来了,那些被丢弃的消息难道就不要了吗?其实并非,咱们能够针对该业务,查询出来将丢失的那批数据,写个临时程序,一点一点的查出来,而后从新灌入mq里面去,把丢的数据给他补回来。

  5.数据是经过push仍是pull方式给到消费端,各自有什么弊端?

  • Push模型实时性好,可是由于状态维护等问题,难以应用到消息中间件的实践中,由于

  • 在Broker端须要维护Consumer的状态,很差适用于Broker去支持大量的Consumer的场景 Consumer的消费速度是不一致的,Broker进行推送难以处理不一样的Consumer的情况 Broker难以应对Consumer没法消费消息的状况,由于不知道Consumer的宕机是短暂的仍是永久的) 另外推送消息(量可能会很大)也会加剧Consumer的负载或者压垮Consumer。

    若是对应只有1个Consumer,用push比pull好。

  • Pull模式实现起来会相对简单一些,可是实时性取决于轮训的频率,在对实时性要求高的场景不适合使用。

3.如何使用MQ(以ActiveQM为例)

附上官网:http://activemq.apache.org/

附上启动服务访问地址:http://127.0.0.1:8161/admin/   用户名/密码 admin/admin

附上代码,jar包本身下 https://pan.baidu.com/s/1SUBoypW-w_--KeFj_HsOtg

发布订阅模式

生产者-发布

public class JMSProducer {

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的链接用户名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的链接密码
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的链接地址
    private static final int SENDNUM=10; // 发送的消息数量
    
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory; // 链接工厂
        Connection connection = null; // 链接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息生产者
        
        // 实例化链接工厂
        connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
        
        try {
            connection=connectionFactory.createConnection(); // 经过链接工厂获取链接
            connection.start(); // 启动链接
            session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立Session
            // destination=session.createQueue("FirstQueue1"); // 建立消息队列
            destination=session.createTopic("FirstTopic1");
            messageProducer=session.createProducer(destination); // 建立消息生产者
            sendMessage(session, messageProducer); // 发送消息
            session.commit();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally{
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发送消息
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
        for(int i=0;i<JMSProducer.SENDNUM;i++){
            TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
            System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i);
            messageProducer.send(message);
        }
    }
}

  消费者-订阅

/**
 * 消息监听-订阅者一
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        // TODO Auto-generated method stub
        try {
            System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
public class JMSConsumer {

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的链接用户名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的链接密码
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的链接地址
    
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 链接工厂
        Connection connection = null; // 链接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息的消费者
        
        // 实例化链接工厂
        connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
                
        try {
            connection=connectionFactory.createConnection();  // 经过链接工厂获取链接
            connection.start(); // 启动链接
            session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session
            // destination=session.createQueue("FirstQueue1");  // 建立链接的消息队列
            destination=session.createTopic("FirstTopic1");
            messageConsumer=session.createConsumer(destination); // 建立消息消费者
            messageConsumer.setMessageListener(new Listener()); // 注册消息监听
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

我认为一个优秀的分布式消息队列,应该具有如下的能力:高吞吐、低时延(因场景而异),传输透明,伸缩性强,有冗灾能力,一致性顺序投递,同步+异步的发送方式,完善的运维和监控工具和开源

欢迎你们关注个人我的公众号,里面不只有技术分享,还有各类行业趣事,让您的生活变的丰富多彩