消息队列rabitMq

rabbitmq

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。消息传递指的是程序之间经过在消息中发送数据进行通讯而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过 队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求html

使用场景

在项目中,将一些无需即时返回且耗时的操做提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提升了系统的吞吐量。java

含义

RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议缓存

客户端

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] args) throws.IOException{
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  String message = "Hello World!";
  channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  System.out.println(" [x] Sent '" + message + "'");
  channel.close();
  connection.close();
  }
}

消费者端

public class RabbitMQRecv {
  public static void main(String avg[]) throws.IOException,java.lang.InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection
= factory.newConnection();     Channel channel = connection.createChannel();     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.
out.println(" [*] Waiting for messages. To exit press CTRL+C");     QueueingConsumer consumer = new QueueingConsumer(channel);     channel.basicConsume(QUEUE_NAME, true, consumer);
    
while (true) {       QueueingConsumer.Delivery delivery = consumer.nextDelivery();       String message = new String(delivery.getBody());       System.out.println(" [x] Received '" + message + "'");     }   } }

几个概念

Exchange:交换机,决定了消息路由规则;
Queue:消息队列;
Channel:进行消息读写的通道;
Bind:绑定了Queue和Exchange,意即为符合什么样路由规则的消息,将会放置入哪个 消息队列
 

RabbitMQ的结构图以下:

 

 

 

几个概念说明:
Broker:简单来讲就是消息队列服务器实体。
  Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列。
  Queue:消息队列载体,每一个消息都会被投入到一个或多个队列。
  Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来。
  Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  vhost:虚拟主机,一个broker里能够开设多个vhost,用做不一样用户的权限分离。
  producer:消息生产者,就是投递消息的程序。
  consumer:消息消费者,就是接受消息的程序。
  channel:消息通道,在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务。
消息队列的使用过程大概以下:
(1)客户端链接到消息队列服务器,打开一个channel。
  (2)客户端声明一个exchange,并设置相关属性。
  (3)客户端声明一个queue,并设置相关属性。
  (4)客户端使用routing key,在exchange和queue之间创建好绑定关系。
  (5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,彻底根据key进行投递的叫 作Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫作 Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不须要key的,叫作 Fanout交换机,它采起广播模式,一个消息进来时,投递到与该交换机绑定的全部队列。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
  (1)exchange持久化,在声明时指定durable => 1
  (2)queue持久化,在声明时指定durable => 1
  (3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
若是exchange和queue都是持久化的,那么它们之间的binding也是持久化的。若是exchange和queue二者之间有一个持久化,一个非持久化,就不容许创建绑定。
 
 

什么是MQ?安全

       MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。MQ是消费-生产者模型的一个典型的表明,一端往消息队列中不断写入消息,而另外一端则能够读取队列中的消息。服务器

      RabbitMQ是MQ的一种。下面详细介绍一下RabbitMQ的基本概念。app

一、队列、生产者、消费者负载均衡

      队列是RabbitMQ的内部对象,用于存储消息。生产者(下图中的P)生产消息并投递到队列中,消费者(下图中的C)能够从队列中获取消息并消费。异步

     

      多个消费者能够订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每一个消费者都收到全部的消息并处理。ide

     

二、Exchange、Binding函数

      刚才咱们看到生产者将消息投递到队列中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的状况是,生产者将消息发送到Exchange(交换器,下图中的X),再经过Binding将Exchange与Queue关联起来。

     

三、Exchange Type、Bingding key、routing key

      在绑定(Binding)Exchange与Queue的同时,通常会指定一个binding key。在绑定多个Queue到同一个Exchange的时候,这些Binding容许使用相同的binding key。

      生产者在将消息发送给Exchange的时候,通常会指定一个routing key,来指定这个消息的路由规则,生产者就能够在发送消息给Exchange时,经过指定routing key来决定消息流向哪里。

      RabbitMQ经常使用的Exchange Type有三种:fanout、direct、topic。

      fanout:把全部发送到该Exchange的消息投递到全部与它绑定的队列中。

      direct:把消息投递到那些binding key与routing key彻底匹配的队列中。

      topic:将消息路由到binding key与routing key模式匹配的队列中。

      附上一张RabbitMQ的结构图:

     

    

最后来具体解析一下几个问题:

一、能够自动建立队列,也能够手动建立队列,若是自动建立队列,那么是谁负责建立队列呢?是生产者?仍是消费者? 

      若是队列不存在,固然消费者不会收到任何的消息。可是若是队列不存在,那么生产者发送的消息就会丢失。因此,为了数据不丢失,消费者和生产者均可以建立队列。那么若是建立一个已经存在的队列呢?那么不会有任何的影响。须要注意的是没有任何的影响,也就是说第二次建立若是参数和第一次不同,那么该操做虽然成功,可是队列属性并不会改变。

      队列对于负载均衡的处理是完美的。对于多个消费者来讲,RabbitMQ使用轮询的方式均衡的发送给不一样的消费者。

二、RabbitMQ的消息确认机制

      默认状况下,若是消息已经被某个消费者正确的接收到了,那么该消息就会被从队列中移除。固然也可让同一个消息发送到不少的消费者。

      若是一个队列没有消费者,那么,若是这个队列有数据到达,那么这个数据会被缓存,不会被丢弃。当有消费者时,这个数据会被当即发送到这个消费者,这个数据被消费者正确收到时,这个数据就被从队列中删除。

     那么什么是正确收到呢?经过ack。每一个消息都要被acknowledged(确认,ack)。咱们能够显示的在程序中去ack,也能够自动的ack。若是有数据没有被ack,那么:

     RabbitMQ Server会把这个信息发送到下一个消费者。

     若是这个app有bug,忘记了ack,那么RabbitMQServer不会再发送数据给它,由于Server认为这个消费者处理能力有限。

    并且ack的机制能够起到限流的做用(Benefitto throttling):在消费者处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的均衡消费者的负载。

 

 二:代码示例

2.1:首先引入rabbitMQ jar包

 <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
 </dependency>

2.2:建立消费者Producer

/**
 * 消息生成者
 */
public class Producer {
    public final static String QUEUE_NAME="rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ相关信息
        factory.setHost("localhost");
      //factory.setUsername("lp");
      //factory.setPassword("");
     // factory.setPort(2088);
        //建立一个新的链接
        Connection connection = factory.newConnection();
        //建立一个通道
        Channel channel = connection.createChannel();
        //  声明一个队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello RabbitMQ";
        //发送消息到队列中
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("Producer Send +'" + message + "'");
        //关闭通道和链接
        channel.close();
        connection.close();
    }
}

注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是不是独占队列(建立者可使用的私有队列,断开后自动删除)、第四个参数为当全部消费者客户端链接断开时是否自动删除队列、第五个参数为队列的其余参数

注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其余属性、第四个参数为发送信息的主体

2.3:建立消费者

 

复制代码
public class Customer {
    private final static String QUEUE_NAME = "rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("localhost");
        //建立一个新的链接
        Connection connection = factory.newConnection();
        //建立一个通道
        Channel channel = connection.createChannel();
        //声明要关注的队列
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        System.out.println("Customer Waiting Received messages");
        //DefaultConsumer类实现了Consumer接口,经过传入一个频道,
        // 告诉服务器咱们须要那个频道的消息,若是频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Customer Received '" + message + "'");
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
复制代码

前面代码咱们能够看出和生成者同样的,后面的是获取生产者发送的信息,其中envelope主要存放生产者相关信息(好比交换机、路由key等)body是消息实体。

2.4:运行结果

生产者:

 

消费者:

 三:实现任务分发

工做队列

一个队列的优势就是很容易处理并行化的工做能力,可是若是咱们积累了大量的工做,咱们就须要更多的工做者来处理,这里就要采用分布机制了。

咱们新建立一个生产者NewTask

复制代码
public class NewTask {
    private static final String TASK_QUEUE_NAME="task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
   channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //分发信息
        for (int i=0;i<10;i++){
            String message="Hello RabbitMQ"+i;
            channel.basicPublish("",TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
            System.out.println("NewTask send '"+message+"'");
        }
        channel.close();
        connection.close();
    }
}
复制代码

而后建立2个工做者Work1和Work2代码同样

复制代码
public class Work1 {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Worker1  Waiting for messages");

        //每次从队列获取的数量
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Worker1  Received '" + message + "'");
                try {
                    throw  new Exception();
                    //doWork(message);
                }catch (Exception e){
                    channel.abort();
                }finally {
                    System.out.println("Worker1 Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;
        //消息消费完成确认
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) {
        try {
            Thread.sleep(1000); // 暂停1秒钟
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}
复制代码

注:channel.basicQos(1);保证一次只分发一个 。autoAck是否自动回复,若是为true的话,每次生产者只要发送信息就会从内存中删除,那么若是消费者程序异常退出,那么就没法获取数据,咱们固然是不但愿出现这样的状况,因此才去手动回复,每当消费者收到并处理信息而后在通知生成者。最后从队列中删除这条信息。若是消费者异常退出,若是还有其余消费者,那么就会把队列中的消息发送给其余消费者,若是没有,等消费者启动时候再次发送。

 

 

参考:http://www.javashuo.com/article/p-vqbsgsxl-dt.html

相关文章
相关标签/搜索