RabbitMQ是由erlang语言开发的一个基于AMQP(Advanced Message Queuing Protocol)协议的企业级消息队列中间件。可实现队列,订阅/发布,路由,通配符等工做模式。html
安装erlang语言运行环境
https://erlang.org/download/otp_win64_23.2.exe
下载后直接下一步便可linux
安装RabbitMQ
https://www.rabbitmq.com/install-windows.html#installer
直接点击安装下一步便可按章程序员
安装RabbitMQ的Web管理平台shell
RabbitMQ的管理平台是经过插件的形式使用,须要手动启用管理平台
在Windows下,RabbitMQ默认被安装到C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.14 下。
打开sbin ,在cmd或者powershell中执行
rabbitmq-plugins.bat enable rabbitmq_management
数据库
安装完成后,浏览器打开 http://localhost:15672/#/ 便可看到RabbitMQ的管理界面。输入默认帐号密码 guest 成功登陆。windows
发送消息的端浏览器
获取消息并处理的端网络
一个终端链接。每个Connection均可以在RabbitMQ后台看到异步
Channel是创建在Connection上的一个虚拟通讯管道。通常状况下,往消息队列中写入多条消息,为了避免每条消息都创建一个TCP链接,因此RabbitMQ的作法是多条消息能够公用一个Connection,大大提升MQ的负载能力。分布式
Exchange是一个虚拟交换机。每一条消息都必需要经过交换机才能能进入对应的队列,能够理解为网络设备中的交换机,是一个意思。
Queue是一个存储消息的内部对象,全部的Rabbit MQ消息都存储在Queue中。生产者所生产的消息会存储在Queue中,消费者获取的消息也是从Queue中获取。
dotnet add package RabbitMQ.Client
const string QUEUENAME = "HELLO_MQ"; //建立链接对象工厂 var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, //RabbitMQ默认的端口 }; while (true) { using var conn = factory.CreateConnection(); var chanel = conn.CreateModel(); chanel.QueueDeclare(QUEUENAME, true, false, false); Console.WriteLine("输入生产内容:"); var input = Console.ReadLine(); chanel.BasicPublish("", QUEUENAME, null, Encoding.Default.GetBytes("hello rabbitmq:" + input)); }
在循环中,输入一个值,按下enter,便可推送一条消息到队列。
也能够直接在RabbitMQ的管理后台查看
能够看到咱们发送的消息已经被RabbitMQ存储在Queue中了。只等某个幸运的消费者前来消费。
const string QUEUENAME = "HELLO_MQ"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var chanel = conn.CreateModel(); chanel.QueueDeclare(QUEUENAME, true, false, false); EventingBasicConsumer consumer = new EventingBasicConsumer(chanel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); chanel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 }; chanel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("启动成功"); Console.ReadLine();
启动成功后,consumer的Received方法,会收到一条来自MQ的消息,
若是处理完成后,不调用chennel的BasicAck方法,那么这条消息依然会存在,下次有消费者出现,会再次推送给消费者。
简单的RabbitMQ Hello World到这里就算完成了。接下来就是稍微高级一点的应用
工做队列模式的意思就是一个生产者对应多个消费者。RabbitMQ会使用轮询去给每一个消费者发送消息。
发布订阅模式是属于比较用多的一种。
发布订阅,是由交换机发布消息给多个队列。多个队列再对应多个消费者。
发布订阅模式对应的交换机类型的fanout。
A
const string QUEUENAME = "HELLO_MQ_B"; const string TESTEXCHANGE = "TESTEXCHANGE"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var channel = conn.CreateModel(); //定义队列 channel.QueueDeclare(QUEUENAME, true, false, false); //定义交换机 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); //绑定队列到交换机 channel.QueueBind(QUEUENAME, TESTEXCHANGE, ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 }; channel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("启动成功"); Console.ReadLine();
B
const string QUEUENAME = "HELLO_MQ"; const string TESTEXCHANGE = "TESTEXCHANGE"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var channel = conn.CreateModel(); //定义队列 channel.QueueDeclare(QUEUENAME, true, false, false); //定义交换机 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); //绑定队列到交换机 channel.QueueBind(QUEUENAME, TESTEXCHANGE, ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 }; channel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("启动成功"); Console.ReadLine();
const string QUEUENAME = "HELLO_MQ"; const string QUEUENAME_B = "HELLO_MQ_B"; const string TESTEXCHANGE = "TESTEXCHANGE"; //建立链接对象工厂 var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, //RabbitMQ默认的端口 }; using var conn = factory.CreateConnection(); while (true) { var channel = conn.CreateModel(); //定义交换机 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); Console.WriteLine("输入生产内容:"); var input = Console.ReadLine(); channel.BasicPublish(TESTEXCHANGE,"", null, Encoding.Default.GetBytes("hello rabbitmq:" + input)); }
在生产者运行成功后,RabbitMQ后台会出现一个交换机,点击交换机会看到交换机下绑定了两个队列
从生产者发送消息到队列,两个消费者会同时收到消息
routing模式对应的交换机类型是direct,和发布订阅模式的区别在于:routing模式下,能够指定一个routingkey,用于区分消息
生产者
var channel = conn.CreateModel(); //定义交换机 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false); //绑定队列到交换机 Console.WriteLine("输入生产内容:"); var input = Console.ReadLine(); channel.BasicPublish(TESTEXCHANGE, "INFO", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
消费者 A
//定义队列 channel.QueueDeclare(QUEUENAME, true, false, false); //定义交换机 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false); //绑定队列到交换机 channel.QueueBind(QUEUENAME, TESTEXCHANGE, "INFO");
消费者 B
//定义队列 channel.QueueDeclare(QUEUENAME, true, false, false); //定义交换机 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false); //绑定队列到交换机 channel.QueueBind(QUEUENAME, TESTEXCHANGE, "ERROR");
绑定成功后,发送消息,消费者A能够收到消息,消费者B没法收到消息。
若是遇到指定routingKey生产一条消息,结果 AB消费者都收到的状况。建议在RabbitMQ后台的交换机下看一下绑定的Queue是否重复绑定了多个routingKey.
在通配符模式下,RabbitMQ使用模糊匹配来决定把消息推送给哪一个生产者。通配符有两个符号来匹配routingKey
其余的操做基本和routing模式同样。
header模式是把routingkey放到header中.取消掉了routingKey。并使用一个字典传递 K、V的方式来匹配。
好比同时要给用户发送邮件和短信,可直接经过header的键值对来匹配绑定的值,把消息传递给发短信和邮件的生产者.
成都南门这边有招BS方向高级.NET程序员的公司吗? 有的话,请私聊我。或者加我QQ:862640563
博客地址:https://www.cnblogs.com/boxrice/ 转载请注明出处