这里原来有一句话,触犯啦天条,被阉割!!!!html
首先不去讨论个人日志组件怎么样。由于有些日志须要走网络,有的又不须要走网路,也是有性能与业务场景的多般变化在其中,就把他抛开,咱们只谈消息RabbitMQ。java
那么什么是RabbitMQ,它是用来解决什么问题的,性能如何,又怎么用?我会在下面一一阐述,若有错误,不到之处,还望你们不吝赐教。算法
必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成(所以也是继承了这些优势)。安全
百度百科对RabbitMQ阐述也很是明确,建议去看下,还有amqp协议。网络
RabbitMQ官网:http://www.rabbitmq.com/ 若是你要下载安装,那么必须先把Erlang语言装上。负载均衡
RabbitMQ的.net客户端,能够在nuget中输入rabbitmq轻松得到。dom
RabbitMQ与其余消息队列的对比,早有仙人给写出来。 Message Queue Shootoutpost
这篇文章中的测试案例为:1百万条1k的消息,每秒种的收发状况以下图。性能
若是你安装好啦,rabbitmq,他会提供一个操做监控页面,页面以下,他几乎提供啦,对rabbitmq的全部操做,与监控,因此,你装上后,本身多看看,多操做下。测试
从上图的标题中能够看到一些陌生的英文单词,让咱们感受一无所知,更无从操做,那么我给你们弄啦一个图片你们能够看下,或许对您理解这些新鲜的单词有所帮助。
看过这些名词,以后,或许你还毫无头绪,那么我把消息从生产到消费的整个流程给你们说一下,或许会更深刻一点,其中Exchange,与Queue都是能够设置相关属性,队列的持久化,交换器类型制定。
Note:首先这个过程走分三个部分,一、客户端(生产消息队列),二、RabbitMQ服务端(负责路由规则的绑定与消息的分发),三、客户端(消费消息队列中的消息)
Note:由图能够看出,一个消息能够走一次网络却被分发到不一样的消息队列中,而后被多个的客户端消费,那么这个过程就是RabbitMQ的核心机制,RabbitMQ的路由类型与消费模式。
类型有4种,direct,fanout,topic,headers。其中headers不经常使用,本篇不作介绍,其余三种类型,会作详细介绍。
那么这些类型是什么意思呢?就是Exchange与队列进行绑定后,消息根据exchang的类型,按照不一样的绑定规则分发消息到消息队列中,能够是一个消息被分发给多个消息队列,也能够是一个消息分发到一个消息队列。具体请看下文。
介绍之初还要说下RoutingKey,这是个什么玩意呢?他是exchange与消息队列绑定中的一个标识。有些路由类型会按照标识对应消息队列,有些路由类型忽略routingkey。具体看下文。
一、Exchange类型direct
他是根据交换器名称与routingkey来找队列的。
Note:消息从client发出,传送给交换器ChangeA,RoutingKey为routingkey.ZLH,那么无论你发送给Queue1,仍是Queue2一个消息都会保存在Queue1,Queue2,Queue3,三个队列中。这就是交换器的direct类型的路由规则。只要找到路由器与routingkey绑定的队列,那么他有多少队列,他就分发给多少队列。
二、Exchange类型fanout
这个类型忽略Routingkey,他为广播模式。
Note:消息从客户端发出,只要queue与exchange有绑定,那么他无论你的Routingkey是什么他都会将消息分发给全部与该exchang绑定的队列中。
三、Exchange类型topic
这个类型的路由规则若是你掌握啦,那是至关的好用,与灵活。他是根据RoutingKey的设置,来作匹配的,其中这里还有两个通配符为:
*,表明任意的一个词。例如topic.zlh.*,他可以匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
#,表明任意多个词。例如topic.#,他可以匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
Note:这个图看上去很乱,可是他是根据匹配符作匹配的,这里我建议你本身作下消息队列的具体操做。
具体操做以下
public static void Producer(int value) { try { var qName = "lhtest1"; var exchangeName = "fanoutchange1"; var exchangeType = "fanout";//topic、fanout var routingKey = "*"; var uri = new Uri("amqp://192.168.10.121:5672/"); var factory = new ConnectionFactory { UserName = "123", Password = "123", RequestedHeartbeat = 0, Endpoint = new AmqpTcpEndpoint(uri) }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //设置交换器的类型 channel.ExchangeDeclare(exchangeName, exchangeType); //声明一个队列,设置队列是否持久化,排他性,与自动删除 channel.QueueDeclare(qName, true, false, false, null); //绑定消息队列,交换器,routingkey channel.QueueBind(qName, exchangeName, routingKey); var properties = channel.CreateBasicProperties(); //队列持久化 properties.Persistent = true; var m = new QMessage(DateTime.Now, value+""); var body = Encoding.UTF8.GetBytes(DoJson.ModelToJson<QMessage>(m)); //发送信息 channel.BasicPublish(exchangeName, routingKey, properties, body); } } } catch (Exception ex) { Console.WriteLine(ex.Message); } }
一、消息队列的消费
Note:若是一个消息队列中有大量消息等待操做时,咱们能够用多个客户端来处理消息,这里的分发机制是采用负载均衡算法中的轮询。第一个消息给A,下一个消息给B,下下一个消息给A,下下下一个消息给B......以此类推。
二、为啦保证消息的安全性,保证此消息被正确处理后才能在服务端的消息队列中删除。那么rabbitmq提供啦ack应答机制,来实现这一功能。
ack应答有两种方式:一、自动应答,二、手动应答。具体实现以下。
public static void Consumer() { try { var qName = "lhtest1"; var exchangeName = "fanoutchange1"; var exchangeType = "fanout";//topic、fanout var routingKey = "*"; var uri = new Uri("amqp://192.168.10.121:5672/"); var factory = new ConnectionFactory { UserName = "123", Password = "123", RequestedHeartbeat = 0, Endpoint = new AmqpTcpEndpoint(uri) }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchangeName, exchangeType); channel.QueueDeclare(qName, true, false, false, null); channel.QueueBind(qName, exchangeName, routingKey); //定义这个队列的消费者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); //false为手动应答,true为自动应答 channel.BasicConsume(qName, false, consumer); while (true) { BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] bytes = ea.Body; var messageStr = Encoding.UTF8.GetString(bytes); var message = DoJson.JsonToModel<QMessage>(messageStr); Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title); //若是是自动应答,下下面这句代码不用写啦。 if ((Convert.ToInt32(message.Title) % 2) == 1) { channel.BasicAck(ea.DeliveryTag, false); } } } } }
核心代码:
channel.queueDeclare(queue_name, durable, false, false, null); //声明消息队列,且为可持久化的
String message="Hello world"+Math.random();
//将队列设置为持久化以后,还须要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
根据 官方博文 的介绍,RabbitMQ在两种状况下会将消息写入磁盘:
消息保存于$MNESIA/msg_store_persistent/x.rdq文件中,其中x为数字编号,从1开始,每一个文件最大为16M(16777216),超过这个大小会生成新的文件,文件编号加1。消息以如下格式存在于文件中:
<<Size:64, MsgId:16/binary, MsgBody>>
MsgId为RabbitMQ经过rabbit_guid:gen()每个消息生成的GUID,MsgBody会包含消息对应的exchange,routing_keys,消息的内容,消息对应的协议版本,消息内容格式(二进制仍是其它)等等。
当全部文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操做(至少有三个文件存在的状况下),以提升磁盘利用率。
publish消息时写入内容,ack消息时删除内容(更新该文件的有用数据大小),当一个文件的有用数据等于0时,删除该文件。
索引的持久化与消息的持久化相似,也是在两种状况下须要写入到磁盘中:要么自己须要持久化,要么由于内存紧张,须要释放部份内存。
当有多个消费者同时收取消息,且每一个消费者在接收消息的同时,还要作其它的事情,且会消耗很长的时间,在此过程当中可能会出现一些意外,好比消息接收到一半的时候,一个消费者宕掉了,这时候就要使用消息接收确认机制,可让其它的消费者再次执行刚才宕掉的消费者没有完成的事情。另外,在默认状况下,咱们建立的消息队列以及存放在队列里面的消息,都是非持久化的,也就是说当RabbitMQ宕掉了或者是重启了,建立的消息队列以及消息都不会保存,为了解决这种状况,保证消息传输的可靠性,咱们可使用RabbitMQ提供的消息队列的持久化机制。
生产者:
说明:
行17 和行20 须要同时设置,也就是将队列设置为持久化以后,还须要将发送的消息也要设置为持久化才能保证队列和消息一直存在
消费者:
说明:
行22: 设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完而且已经对刚才处理的消息进行确认以后, 才发送下一条消息,防止消费者太过于忙碌。以下图所示: