AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通讯。RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,因此安装RabbitMq服务器端前须要先安装Erlang。如下记录在windows 7环境下的学习笔记。html
Erlang下载地址:https://www.erlang.org/downloads正则表达式
RabbitMQ服务端下载地址:https://www.rabbitmq.com/install-windows.htmlwindows
安装好Erlang和RabbitMQ以后都须要新建系统变量,计算机-->属性-->高级系统设置-->环境变量-->系统变量新建服务器
建系统变量时值为安装目录,个人安装目录:网络
Erlang:C:\Program Files\erl10.4app
RabbitMQ:C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15学习
新建系统变量后,还须要在已有系统变量PATH中增长Erlang和RabbitMQ的配置(注:是追加,PATH原有的值不要删除),以分号分隔。测试
Erlang:%ERLANG_HOME%\binfetch
RabbitMQ:%RABBITMQ_SERVER%\sbinspa
在cmd中运行以下命令:
"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
默认会生成一个帐号guest,密码也是guest。
查看已有用户及用户的角色命令:
rabbitmqctl.bat list_users
RabbitMQ安装后是默认启动服务的,若要手动启动、中止服务,执行以下命令:
启动服务:
net start rabbitmq
中止服务:
net stop rabbitmq
默认地址是本地端口15672,地址:http://127.0.0.1:15672,用guest帐号登陆。
消息(message)被发布者(publisher)发送给交换机(exchange),交换机经常被比喻成邮局或者邮箱。而后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
Default exchange(默认交换机):一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每一个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
Direct exchange(直连交换机):根据消息携带的路由键(routing key)将消息投递给对应队列的。
Fanout exchange(扇型交换机):将消息路由给绑定到它身上的全部队列,而不理会绑定的路由键。若是N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这全部的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
Topic exchange(主题交换机):经过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机常常用来实现各类分发/订阅模式及其变种。主题交换机一般用来实现消息的多播路由(multicast routing)。
routing_key格式是以点号“."分割的字符表。对于routing_key,有两个特殊字符(在正则表达式里叫元字符):
*表明任意一个单词
#表明0个或者多个单词
因为有"*"和"#",Topic exchange 很是强大而且能够转化为其余的exchange:
若是binding_key是"#",它会接收全部的Message,无论routing_key是什么,就像是Fanout exchange。
若是"*"和"#"都没有被使用,那么topic exchange就变成了Direct exchange。
Headers exchange(头交换机):有时消息的路由操做会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键创建路由规则。经过判断消息头的值可否与指定的绑定相匹配来确立路由规则。
建立2个项目,1个发送端Send,1个接收端Receive,经过Nuget安装RabbitMQ.Client。
发送端代码:
var message ="Hello Word "; var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); for (int i = 0; i < 10; i++) { var body = Encoding.UTF8.GetBytes(message + i); channel.BasicPublish(exchange: "",//使用默认交换机 routingKey: "hello", basicProperties: null, //new BasicProperties() { DeliveryMode = 2 }, body: body); } }
接收端代码:
var factory = new ConnectionFactory() { HostName = "localhost" }; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//限制流量,向消费者一条条发送消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var directory = AppContext.BaseDirectory + "/Log"; if (!Directory.Exists(directory)) { Directory.CreateDirectory(directory); } using (FileStream fs = System.IO.File.Open(directory + $"/{DateTime.Now:yyyyMMdd}{ea.ConsumerTag}.txt", FileMode.Append, FileAccess.Write, FileShare.ReadWrite)) { var sr = new StreamWriter(fs); sr.WriteLine($"[{DateTime.Now:yyyyMMddHHmmssfff}]"); sr.WriteLine(ea.ConsumerTag + ":" + message); sr.Flush(); sr.Close(); Thread.Sleep(100);//可复制一份代码,将等待的时间设置成200,测试多消费者 } channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动消息确认 }; channel.BasicConsume(queue: "hello", autoAck: false,//消息确认设置成手动 consumer: consumer);
更多的内容能够看如下文章:
https://blog.csdn.net/anzhsoft2008/column/info/rabbitmq
http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html