【转】RabbitMQ基础——和——持久化机制

这里原来有一句话,触犯啦天条,被阉割!!!!html

首先不去讨论个人日志组件怎么样。由于有些日志须要走网络,有的又不须要走网路,也是有性能与业务场景的多般变化在其中,就把他抛开,咱们只谈消息RabbitMQ。java

那么什么是RabbitMQ,它是用来解决什么问题的,性能如何,又怎么用?我会在下面一一阐述,若有错误,不到之处,还望你们不吝赐教。算法

RabbitMQ简介

必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成(所以也是继承了这些优势)。安全

百度百科对RabbitMQ阐述也很是明确,建议去看下,还有amqp协议。网络

RabbitMQ官网:http://www.rabbitmq.com/ 若是你要下载安装,那么必须先把Erlang语言装上。负载均衡

RabbitMQ的.net客户端,能够在nuget中输入rabbitmq轻松得到。dom

RabbitMQ与其余消息队列的对比,早有仙人给写出来。 Message Queue Shootoutpost

这篇文章中的测试案例为:1百万条1k的消息,每秒种的收发状况以下图。性能

 

若是你安装好啦,rabbitmq,他会提供一个操做监控页面,页面以下,他几乎提供啦,对rabbitmq的全部操做,与监控,因此,你装上后,本身多看看,多操做下。测试

 

RabbitMQ中的一些名词阐述与消息从投递到消费的整个过程

从上图的标题中能够看到一些陌生的英文单词,让咱们感受一无所知,更无从操做,那么我给你们弄啦一个图片你们能够看下,或许对您理解这些新鲜的单词有所帮助。

 

看过这些名词,以后,或许你还毫无头绪,那么我把消息从生产到消费的整个流程给你们说一下,或许会更深刻一点,其中Exchange,与Queue都是能够设置相关属性,队列的持久化,交换器类型制定。

 

Note:首先这个过程走分三个部分,一、客户端(生产消息队列),二、RabbitMQ服务端(负责路由规则的绑定与消息的分发),三、客户端(消费消息队列中的消息)

 

 

Note:由图能够看出,一个消息能够走一次网络却被分发到不一样的消息队列中,而后被多个的客户端消费,那么这个过程就是RabbitMQ的核心机制,RabbitMQ的路由类型与消费模式。

RabbitMQ中Exchange的类型

类型有4种,direct,fanout,topic,headers。其中headers不经常使用,本篇不作介绍,其余三种类型,会作详细介绍。

那么这些类型是什么意思呢?就是Exchange与队列进行绑定后,消息根据exchang的类型,按照不一样的绑定规则分发消息到消息队列中,能够是一个消息被分发给多个消息队列,也能够是一个消息分发到一个消息队列。具体请看下文。

介绍之初还要说下RoutingKey,这是个什么玩意呢?他是exchange与消息队列绑定中的一个标识。有些路由类型会按照标识对应消息队列,有些路由类型忽略routingkey。具体看下文。

一、Exchange类型direct

他是根据交换器名称与routingkey来找队列的。

 

Note:消息从client发出,传送给交换器ChangeA,RoutingKey为routingkey.ZLH,那么无论你发送给Queue1,仍是Queue2一个消息都会保存在Queue1,Queue2,Queue3,三个队列中。这就是交换器的direct类型的路由规则。只要找到路由器与routingkey绑定的队列,那么他有多少队列,他就分发给多少队列。

二、Exchange类型fanout

这个类型忽略Routingkey,他为广播模式。

 

Note:消息从客户端发出,只要queue与exchange有绑定,那么他无论你的Routingkey是什么他都会将消息分发给全部与该exchang绑定的队列中。

三、Exchange类型topic

这个类型的路由规则若是你掌握啦,那是至关的好用,与灵活。他是根据RoutingKey的设置,来作匹配的,其中这里还有两个通配符为:

*,表明任意的一个词。例如topic.zlh.*,他可以匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

#,表明任意多个词。例如topic.#,他可以匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

 

Note:这个图看上去很乱,可是他是根据匹配符作匹配的,这里我建议你本身作下消息队列的具体操做。

具体操做以下

复制代码
 public static void Producer(int value)
        {
            try
            {
                var qName = "lhtest1";
                var exchangeName = "fanoutchange1";
                var exchangeType = "fanout";//topic、fanout
                var routingKey = "*";
                var uri = new Uri("amqp://192.168.10.121:5672/");
                var factory = new ConnectionFactory
                {
                    UserName = "123",
                    Password = "123",
                    RequestedHeartbeat = 0,
                    Endpoint = new AmqpTcpEndpoint(uri)
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //设置交换器的类型
                        channel.ExchangeDeclare(exchangeName, exchangeType);
                        //声明一个队列,设置队列是否持久化,排他性,与自动删除
                        channel.QueueDeclare(qName, true, false, false, null);
                        //绑定消息队列,交换器,routingkey
                        channel.QueueBind(qName, exchangeName, routingKey);
                        var properties = channel.CreateBasicProperties();
                        //队列持久化
                        properties.Persistent = true;
                        var m = new QMessage(DateTime.Now, value+"");
                        var body = Encoding.UTF8.GetBytes(DoJson.ModelToJson<QMessage>(m));
                        //发送信息
                        channel.BasicPublish(exchangeName, routingKey, properties, body);
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
复制代码

消息队列的消费与消息确认Ack

一、消息队列的消费

Note:若是一个消息队列中有大量消息等待操做时,咱们能够用多个客户端来处理消息,这里的分发机制是采用负载均衡算法中的轮询。第一个消息给A,下一个消息给B,下下一个消息给A,下下下一个消息给B......以此类推。

二、为啦保证消息的安全性,保证此消息被正确处理后才能在服务端的消息队列中删除。那么rabbitmq提供啦ack应答机制,来实现这一功能。

ack应答有两种方式:一、自动应答,二、手动应答。具体实现以下。

复制代码
 public static void Consumer()
        {
            try
            {
                var qName = "lhtest1";
                var exchangeName = "fanoutchange1";
                var exchangeType = "fanout";//topic、fanout
                var routingKey = "*";
                var uri = new Uri("amqp://192.168.10.121:5672/");
                var factory = new ConnectionFactory
                {
                    UserName = "123",
                    Password = "123",
                    RequestedHeartbeat = 0,
                    Endpoint = new AmqpTcpEndpoint(uri)
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType);
                        channel.QueueDeclare(qName, true, false, false, null);
                        channel.QueueBind(qName, exchangeName, routingKey);
                        //定义这个队列的消费者
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        //false为手动应答,true为自动应答
                        channel.BasicConsume(qName, false, consumer);
                        while (true)
                        {
                            BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();                           
                            byte[] bytes = ea.Body;
                            var messageStr = Encoding.UTF8.GetString(bytes);
                            var message = DoJson.JsonToModel<QMessage>(messageStr);
                            Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);
                            //若是是自动应答,下下面这句代码不用写啦。
                            if ((Convert.ToInt32(message.Title) % 2) == 1)
                            {
                                channel.BasicAck(ea.DeliveryTag, false);
                            }
                        }
                    }
                }
            }

 

RabbitMQ持久化机制

核心代码:

        channel.queueDeclare(queue_name, durable, false, false, null); //声明消息队列,且为可持久化的
         String message="Hello world"+Math.random();
         //将队列设置为持久化以后,还须要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
         channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

消息何时须要持久化?

根据 官方博文 的介绍,RabbitMQ在两种状况下会将消息写入磁盘:

 

  1. 消息自己在publish的时候就要求消息写入磁盘;
  2. 内存紧张,须要将部份内存中的消息转移到磁盘;

 

消息何时会刷到磁盘?

 

  1. 写入文件前会有一个Buffer,大小为1M(1048576),数据在写入文件时,首先会写入到这个Buffer,若是Buffer已满,则会将Buffer写入到文件(未必刷到磁盘);
  2. 有个固定的刷盘时间:25ms,也就是无论Buffer满不满,每隔25ms,Buffer里的数据及未刷新到磁盘的文件内容一定会刷到磁盘;
  3. 每次消息写入后,若是没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0来实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操做。

 

消息在磁盘文件中的格式

消息保存于$MNESIA/msg_store_persistent/x.rdq文件中,其中x为数字编号,从1开始,每一个文件最大为16M(16777216),超过这个大小会生成新的文件,文件编号加1。消息以如下格式存在于文件中:

 

<<Size:64, MsgId:16/binary, MsgBody>>

 

MsgId为RabbitMQ经过rabbit_guid:gen()每个消息生成的GUID,MsgBody会包含消息对应的exchange,routing_keys,消息的内容,消息对应的协议版本,消息内容格式(二进制仍是其它)等等。

文件什么时候删除?

当全部文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操做(至少有三个文件存在的状况下),以提升磁盘利用率。

publish消息时写入内容,ack消息时删除内容(更新该文件的有用数据大小),当一个文件的有用数据等于0时,删除该文件。

消息索引何时须要持久化?

索引的持久化与消息的持久化相似,也是在两种状况下须要写入到磁盘中:要么自己须要持久化,要么由于内存紧张,须要释放部份内存。

消息索引何时会刷到磁盘?

 

  1. 有个固定的刷盘时间:25ms,索引文件内容一定会刷到磁盘;
  2. 每次消息(及索引)写入后,若是没有后续写入请求,则会直接将已写入的索引刷到磁盘,实现上与消息的timeout刷盘一致。

RabbitMQ(二)队列与消息的持久化

 

当有多个消费者同时收取消息,且每一个消费者在接收消息的同时,还要作其它的事情,且会消耗很长的时间,在此过程当中可能会出现一些意外,好比消息接收到一半的时候,一个消费者宕掉了,这时候就要使用消息接收确认机制,可让其它的消费者再次执行刚才宕掉的消费者没有完成的事情。另外,在默认状况下,咱们建立的消息队列以及存放在队列里面的消息,都是非持久化的,也就是说当RabbitMQ宕掉了或者是重启了,建立的消息队列以及消息都不会保存,为了解决这种状况,保证消息传输的可靠性,咱们可使用RabbitMQ提供的消息队列的持久化机制。

 

 

 生产者:

复制代码
 1 import com.rabbitmq.client.ConnectionFactory;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.MessageProperties;
 5 public class ClientSend1 {
 6     public static final String queue_name="my_queue";
 7     public static final boolean durable=true; //消息队列持久化
 8     public static void main(String[] args)
 9     throws java.io.IOException{
10         ConnectionFactory factory=new ConnectionFactory(); //建立链接工厂
11         factory.setHost("localhost");
12         factory.setVirtualHost("my_mq");
13         factory.setUsername("zhxia");
14         factory.setPassword("123456");
15         Connection connection=factory.newConnection(); //建立链接
16         Channel channel=connection.createChannel();//建立信道
17         channel.queueDeclare(queue_name, durable, false, false, null); //声明消息队列,且为可持久化的
18         String message="Hello world"+Math.random();
19         //将队列设置为持久化以后,还须要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
20         channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
21         System.out.println("Send message:"+message);
22         channel.close();
23         connection.close();
24     }
25 
26 }
复制代码

 

说明:

行17 和行20 须要同时设置,也就是将队列设置为持久化以后,还须要将发送的消息也要设置为持久化才能保证队列和消息一直存在

 消费者:

复制代码
 1 import com.rabbitmq.client.ConnectionFactory;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.QueueingConsumer;
 5 public class ClientReceive1 {
 6     public static final String queue_name="my_queue";
 7     public static final boolean autoAck=false;
 8     public static final boolean durable=true;
 9     public static void main(String[] args)
10     throws java.io.IOException,java.lang.InterruptedException{
11         ConnectionFactory factory=new ConnectionFactory();
12         factory.setHost("localhost");
13         factory.setVirtualHost("my_mq");
14         factory.setUsername("zhxia");
15         factory.setPassword("123456");
16         Connection connection=factory.newConnection();
17         Channel channel=connection.createChannel();
18         channel.queueDeclare(queue_name, durable, false, false, null);
19         System.out.println("Wait for message");
20         channel.basicQos(1); //消息分发处理
21         QueueingConsumer consumer=new QueueingConsumer(channel);
22         channel.basicConsume(queue_name, autoAck, consumer);
23         while(true){
24             Thread.sleep(500);
25             QueueingConsumer.Delivery deliver=consumer.nextDelivery();
26             String message=new String(deliver.getBody());
27             System.out.println("Message received:"+message);
28             channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
29         }
30     }
31 }
复制代码

 说明:

行22: 设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完而且已经对刚才处理的消息进行确认以后, 才发送下一条消息,防止消费者太过于忙碌。以下图所示:

 

相关文章
相关标签/搜索