RabbitMQ - Start Up

  • 开始以前

  rabbitmq是一个被普遍使用的消息队列,它是由erlang编写的,根据AMQP协议设计实现的。html

  AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
  RabbitMQ是一个开源的AMQP实现,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。java

  下载rabbitMq数据库

  http://www.rabbitmq.com/download.htmlapi

  erlang环境安全

  http://www.erlang.org/服务器

  在启动rabbitMq以前须要安装erlang环境,而且erlang环境和rabbitMq版本要匹配异步

 

  • AMQP

  ​AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不一样的开发语言等条件的限制。socket

  AMQ Model分布式

   

  Message Queue

​   消息队列会将消息存储到内存或者磁盘中,并将这些消息按照必定顺序转发给一个或者多个消费者,每一个消息队列都是独立隔离的,相互不影响。ide

​   消息队列具备不一样的属性:私有,共享,持久化,临时,客户端定义 或者服务端定义等,能够基于实际需求选择对应的类型,以 RabbitMQ 队列特性为例:

   共享持久化消息队列:将发送的消息存储到磁盘,而后将消息转发给订阅该队列的全部消费者;

   私有临时消息队列:RabbitMQ 支持 rpc 调用,再调用过程当中消费者都会临时生成一个消息队列,只有当前消费者可见,且由服务端生成,调用完就会销毁队列。

  Exchange

   交换机收到生产者投递的消息,基于路由规则及队列绑定关系匹配到投递对应的交换机或者队列进行分发,交换机不存储消息,只作转发。

   AMQP定义了许多标准交换类型,基本涵盖了消息传递所需的路由类型,通常 AMQP 服务器都会提供默认的交换机基于交换机类型命名,AMQP 的应用程序也能够建立本身的交换机用于绑定指定的消息队列发布消息。

   RabbitMQ

 

  • 概念

  channel

    channel是定义每个生产者或者消费者与Mq的链接,Mq的各类事务都是以channel为基原本扩展的,它相似于一个socket链接。

 

  exchanger

    交换器。在rabbitMq设计的内部,是经过exchanger来决定消息怎么分发到queue上的。定义了交换器须要把队列绑定到交换器上,并设置规则,这样交换器会将消息分发到对应的queue上。一般的交换器有四种:direct,fanout,topic,header。

  routekey

    路由键,能够看作是对交换器的一种补充。给消息设置一个路由键,队列会拿到带有它感兴趣的路由键消息。routing key设定的长度限制为255 bytes。

  queue

    队列。做为消费者须要关心的元素,它承载着消息的存放与管理功能。全部的消息最终会投递到队列上,而队列上的消费者会取走它们并执行消费者的任务。消费者能够拒收消息,并告诉队列把它从新入队或者丢弃,队列能够是channel独占的,也能够是共享的。而且队列提供了持久化的功能。

  • 开始搬砖

  引入amqp客户端

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.1</version>
        </dependency>

  amqp客户端提供的api都是按照以上的概念设计的。

  首先咱们须要创建与消息队列的链接,并拿到这个链接。

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(Properties.host);
            //建立一个新的链接
            Connection connection = factory.newConnection();

 

  而后咱们拿到一个channel。

            //建立一个通道
            Channel channel = connection.createChannel();

  接着咱们就在这个channel上作各类操做了。

  这时候须要用到咱们上述介绍的概念了,绑定交换器,设置路由键,而且把队列绑定到交换器上。

  以下几种交换器

  direct

 

   此类交换器是直接往队列发送的一种交换器,从名字上很容易理解。

            //  声明一个队列
            channel.exchangeDeclare("basic","direct");
            channel.queueBind(queueName, "basic",Properties.routeKey);

  在rabbitMq管理页面上能够看到绑定的状况。

  fanout

  扇形交换器,相似于一对多发送的交换器。它将详细发送到具备相同特征的queue上,是一种广播特性的交换器。

 

 

            //扇形交换器 广播
            channel.exchangeDeclare(exchangeName,"fanout");
            channel.queueBind(queueName1, exchangeName,routeKey);

   在管理页面上现实了设置的队列。

 

  topic

  主题交换器,将消息分发到订阅了某个主题的queue里。设置主题的关键是在于路由键是否符合必定的通配。

  

        // 声明转发器  
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");    
        //定义绑定键     
        String[] routing_keys = new String[] { "kernal.info", "cron.warning",    
                "auth.info", "kernel.critical" }; 
        for (String routing_key : routing_keys)    
        {     
            channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg.getBytes());    
        } 

  在作消息发送的时候,围绕着basicPublisher在作;作消息消费的时候,是围绕着basicConsumer来作。

  

    /**
     * Publish a message.
     *
     * Publishing to a non-existent exchange will result in a channel-level
     * protocol exception, which closes the channel.
     *
     * Invocations of <code>Channel#basicPublish</code> will eventually block if a
     * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
     *
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    /**
     * Start a non-nolocal, non-exclusive consumer, with
     * a server-generated consumerTag.
     * @param queue the name of the queue
     * @param autoAck true if the server should consider messages
     * acknowledged once delivered; false if the server should expect
     * explicit acknowledgements
     * @param callback an interface to the consumer object
     * @return the consumerTag generated by the server
     * @throws java.io.IOException if an error is encountered
     * @see com.rabbitmq.client.AMQP.Basic.Consume
     * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
     * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
     */
    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

  生产者关注的是exchanger和路由键,消费者关注的是queue。

    //基础生产者
    public void basicPublish(String queueName,String message){
        try{
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(Properties.host);
            //建立一个新的链接
            Connection connection = factory.newConnection();
            //建立一个通道
            Channel channel = connection.createChannel();
            channel.confirmSelect();
            //  声明一个队列
            channel.exchangeDeclare("basic","direct");
            channel.queueBind(queueName, "basic",Properties.routeKey);
            //添加一个监听器
            channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Producer Send :"+deliveryTag+", ack, multiple :"+multiple);
                }

                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Producer Send :"+deliveryTag+", nack");
                }
            });
            //发送消息到队列中
            for(int i=0;i<5;i++){
                channel.basicPublish("basic",Properties.routeKey, null, message.getBytes("UTF-8"));
                boolean isok = channel.waitForConfirms();
                if(isok){
                    System.out.println("Producer Send Message :" + message);
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    //基础消费者
    public void basicConsumer(String queueName){
        try{
            // 建立链接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ地址
            factory.setHost(Properties.host);
            //建立一个新的链接
            Connection connection = factory.newConnection();
            //建立一个通道
            Channel channel = connection.createChannel();
            channel.confirmSelect();
            //声明要关注的队列
            channel.queueBind(queueName,"basic",Properties.routeKey);
            System.out.println("Customer Waiting Received messages");
            //实现一个消费者逻辑
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Message tag:"+envelope.getDeliveryTag()+",Customer Received '" + message + "'");
                    //手动设置
                    if(envelope.getDeliveryTag() % 2 == 0){
                        this.getChannel().basicAck(envelope.getDeliveryTag(),false);
                    }else {
                        this.getChannel().basicReject(envelope.getDeliveryTag(),false);
                    }
                }
            };
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume(queueName, false, consumer);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
  • 消息确认

  rabbitMq的消息确认,不管生产者和消费者,消息确认的对象都是broker自己。生产者确保消息准确投递到broker server,server返回了ack则表示消息已投递。消费者则是签收方,broker server确认消息投递成功后,则在内存中移除这个消息。

  为了确保消息准确的投递,rabbitMq提供了两种消息确认机制。

  confirmSelect

  若是生产者将channel设置成confirm模式,全部在该信道上面发布的消息都将会被指派一个惟一的ID(deliverId),一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会在将消息写入磁盘以后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也能够设置basic.ack的multiple域,表示到这个序列号以前的全部消息都已经获得了处理;

       confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信道返回确认的同时继续发送下一条消息,当消息最终获得确认以后,生产者应用即可以经过回调方法来处理该确认消息,若是RabbitMQ由于自身内部错误致使消息丢失,就会发送一条nack消息,生产者应用程序一样能够在回调方法中处理该nack消息;

  生产者能够对这两种响应作出监听处理。

            //添加一个监听器
            channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Producer Send :"+deliveryTag+", ack, multiple :"+multiple);
                }

                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Producer Send :"+deliveryTag+", nack");
                }
            });

  开启这种模式,只须要在channel中调用confirmSelect。

            channel.confirmSelect();

  txSelect

  事务模式是一种增长可靠性的手段,它必然会牺牲性能,与数据库事务的理解差很少。在事务范围内,保证消息投递成功。

try {
    channel.txSelect(); // 声明事务
    // 发送消息
    channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    channel.txCommit(); // 提交事务
} catch (Exception e) {
    channel.txRollback();
}

  rollback以后,等因而丢弃了消息。

            //发送消息到队列中
            for(int i=0;i<5;i++){
                try{
                    channel.txSelect();
                    channel.basicPublish("basic",Properties.routeKey, null, message.getBytes("UTF-8"));
                    if(i % 3 == 0){
                        throw new Exception();
                    }
                    channel.txCommit();
                }catch (Exception e){
                    e.printStackTrace();
                    channel.txRollback();
                }
            }

  改变了代码以后,失败的两条并未进入Ready。

  • 结尾

   因为时间问题,没有对各类消息确认机制进行抓包分析,性能分析。以及了解rabbitMq的cluster模式,高可用模式等,只是基本上了解了它的api以及概念原理。对于深刻学习有几个方向,一是集群扩展横向扩展方案,二是exchanger集群同步方案,三是消息堆积处理问题等,由于没有在实际工做中使用,不少场景都不曾考虑到。消息队列最典型的应用就是使用来解耦事务,作成基于MQ的2PC分布式事务,以及实现最终一致方案。在将来会在这方面尝试。

相关文章
相关标签/搜索