了解RabbitMQ

简介:RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。数据库

RabbitMQ的安装与配置百度都有,我就不介绍了,毕竟这里写起来也是比较麻烦的。数据结构

何时须要用到RabbitMQ呢?我从一个简单的例子来引入。函数

假设有这么一个功能,咱们要对接收到的压缩包进行解压并处理,那么咱们第一步想到的是否相似于这样:性能

public static void main(){
       
       public void Do(){
             while(true){
                  String guid = Receive();
                  Deal(guid);
             }
       }       

       public String Receive(){
      //接收文件并返回文件标识
             return guid;
       }

       public void Deal(String guid){
             //解压缩并处理
                 
       }  
}

这样咱们就能把基本的功能实现了,先接收文件而后处理文件,咋一看好像没什么问题,可是仔细想一想,这个程序是否是有点浪费,由于接收和处理两个方法每次执行只有一个方法在运行。那咱们继续改一下:ui

public static void main(){
       
       public void Do(){
             while(true){
                  String guid = Receive();
             }
             while(true){
                  Deal(guid);
             }
       }       

       public String Receive(){
      //接收文件并返回文件标识
             return guid;
       }

       public void Deal(String guid){
             //解压缩并处理
                 
       }  
}

这样问题又来了,怎么把guid传给deal呢,咱们继续改造,加入一种数据结构:spa

public static void main(){
       
       ArrayList list = new ArrayList();

       public void Do(){
             while(true){
                  String guid = Receive();
                  list.add(guid);
             }
             while(true){
                  if(list.size()>0){
                       String guid = list.get(0);
                       Deal(guid);
                       list.remove(0);
                  }
             }
       }       

       public String Receive(){
      //接收文件并返回文件标识
             return guid;
       }

       public void Deal(String guid){
             //解压缩并处理
                 
       }  
}

咱们实际操做中会有这样的状况,可能接收须要1秒钟,处理须要2秒钟,咱们用两个线程分别执行,那颇有可能有两个线程同时去判断list.size()>0,而后处理同一个对象,这是咱们不肯意看到的。线程

想一想大学时候学的队列,是否是很符合要求,先进先出,因此咱们稍微改下,把ArrayList换成QueueList:code

public static void main(){
       
       QueueList list = new QueueList();

       public void Do(){
             while(true){
                  String guid = Receive();
                  list.enqueue(guid);
             }
             while(true){
                  String guid = list.dequeue();
                  Deal(guid);     
             }
       }       

       public String Receive(){
      //接收文件并返回文件标识
             return guid;
       }

       public void Deal(String guid){
             //解压缩并处理
                 
       }  
}

那可能随着数据量愈来愈庞大,一个进程没法知足,咱们就要建立多个进程,甚至多台机器。那么问题又来了,队列如何共享呢?对象

既然须要全局的队列,那么咱们是否须要知足下面几点:blog

① 多个程序像链接数据库一个能够访问同一个队列

② 程序既能够enqueue又能够dequeue

③ 若是有新的数据入库,能够反过来通知程序去接收处理,而不是咱们程序主动扫描

④ 程序有容错功能,宕机、停电或重启等操做,能让数据保留下来(持久化)

⑤ 一个数据,不能被两个程序同时访问,得有锁定功能

⑥ 既然是一个独立软件,就不能只管理一个队列,应该能够管理多个

想一想咱们本身来实现,是否是挺恐怖的!

这时候消息队列的概念就被引入了,咱们已知的有微软的MessageQueue,开源的RabbitMQ,还有Apache的ActiveMQ

固然,若是须要在代码中引入的话,第一步确定是引入jar包。咱们来写一个简单的加入队列和取出队列功能

public static void main() throws  IOException,TimeoutException{
      ConnectionFactory connect = new ConnectionFactory();
      //rabbitMQ IP
      connect.setHost("192.168.215.331");
      //端口号
      connect.setPort(5672);
      //用户名
      connect.setUsername("lsd");
      //密码
      connect.setPassword("123456");
      String queueName = "TESTMQ";
      
      Connection connection = connect.newConnection();
      Channel channel = connection.createChannel();
      channel.queueDeclare(queueName,true,false,false,null);
      String msg = "Hello";
      channel.basicPublish("",queueName,null,msg.getBytes("UTF-8"));
      channel.close();
      connection.close();
}

咱们在上面的程序里面新建了一个叫作TESTMQ的队列,而且往队列里面放入了一个字符串Hello

public static void main() throws  IOException,TimeoutException{
      ConnectionFactory connect = new ConnectionFactory();
      //rabbitMQ IP
      connect.setHost("192.168.215.331");
      //端口号
      connect.setPort(5672);
      //用户名
      connect.setUsername("lsd");
      //密码
      connect.setPassword("123456");
      String queueName = "TESTMQ";
      Connection connection = connect.newConnection();
      Channel channel = connection.createChannel();
      channel.queueDeclare(queueName,true,false,false,null);

      channel.basicQos();
      QueueingConsumer consumer = new Queueingconsumer(channel);
      channel.basicConsume(queuqName,false,consumer);
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      byte[] byte = delivery.getBody();
      System.out.println(new String(byte));
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
}

 咱们在上面的程序里面从一个叫作TESTMQ的队列里面不停的取值(由于是死循环),很容易理解吧,第一个main函数能够不断执行,第二个main函数都能及时取到值,先讲到这儿,入门应该是够了

相关文章
相关标签/搜索