windows环境安装:
http://www.javashuo.com/article/p-hwqddnpd-gp.html
.NET Core 使用RabbitMQ
http://www.javashuo.com/article/p-yplqyfft-o.htmlwindows
安装
"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin\rabbitmq-plugins.bat" enable rabbitmq_managementmarkdown
net stop RabbitMQ && net start RabbitMQpost
建立用户,密码,绑定角色网站
查看已有用户及用户的角色:
rabbitmqctl.bat list_usersspa
新增一个用户:
rabbitmqctl.bat add_user username password
示例:
rabbitmqctl.bat add_user tangsansan 1234563d
rabbitmqctl.bat set_user_tags username administratorcode
示例:
rabbitmqctl.bat set_user_tags tangsansan administratorserver
基本用法
引入:RabbitMQ.Clienthtm
消费者
//建立链接工厂 var factory = new ConnectionFactory() { UserName = "tangsansan",//用户名 Password = "123456",//密码 HostName = "localhost"//rabbitmq ip }; //建立链接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定义一个队列 channel.QueueDeclare("hello", false, false, false, null); Console.WriteLine("\nRabbitMQ链接成功,请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish("", "hello", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();
生产者
//建立链接工厂 var factory = new ConnectionFactory() { UserName = "tangsansan",//用户名 Password = "123456",//密码 HostName = "localhost"//rabbitmq ip }; //建立链接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定义一个队列 channel.QueueDeclare("hello", false, false, false, null); Console.WriteLine("\nRabbitMQ链接成功,请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish("", "hello", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();
启动了一个生产者,两个消费者,能够看见两个消费者都能收到消息,消息投递到哪一个消费者是由RabbitMQ决定的。
RabbitMQ消费失败的处理
RabbitMQ采用消息应答机制,即消费者收到一个消息以后,须要发送一个应答,而后RabbitMQ才会将这个消息从队列中删除,若是消费者
在消费过程当中出现异常,断开链接切没有发送应答,那么RabbitMQ会将这个消息从新投递
//接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到消息: {message}"); Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执"); Thread.Sleep(10000); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"已发送回执[{ea.DeliveryTag}]"); };
使用RabbitMQ的Exchange
前面咱们能够看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的状况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)
Direct Exchange
string exchangeName = "TestChange"; string queueName = "hello"; string routeKey = "helloRouteKey"; //建立链接工厂 var factory = new ConnectionFactory() { UserName = "tangsansan",//用户名 Password = "123456",//密码 HostName = "localhost"//rabbitmq ip }; //建立链接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定义一个Direct类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null); //定义一个队列 channel.QueueDeclare(queueName, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($"\nRabbitMQ链接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();
Fanout Exchange
全部发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的全部Queue上。
Fanout Exchange 不须要处理RouteKey 。只须要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的全部队列上。相似子网广播,每台子网内的主机都得到了一份复制的消息。
因此,Fanout Exchange 转发消息是最快的。
static void Main(string[] args) { string exchangeName = "TestFanoutChange"; string queueName1 = "hello1"; string queueName2 = "hello"; string routeKey = ""; //建立链接工厂 ConnectionFactory factory = new ConnectionFactory { UserName = "tangsansan",//用户名 Password = "123456",//密码 HostName = "localhost"//rabbitmq ip }; //建立链接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定义一个Direct类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null); //定义队列1 channel.QueueDeclare(queueName1, false, false, false, null); //定义队列2 channel.QueueDeclare(queueName2, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName1, exchangeName, routeKey, null); channel.QueueBind(queueName2, exchangeName, routeKey, null); //生成两个队列的消费者 ConsumerGenerator(queueName1); ConsumerGenerator(queueName2); Console.WriteLine($"\nRabbitMQ链接成功,\n\n请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); } /// <summary> /// 根据队列名称生成消费者 /// </summary> /// <param name="queueName"></param> static void ConsumerGenerator(string queueName) { //建立链接工厂 ConnectionFactory factory = new ConnectionFactory { UserName = "tangsansan",//用户名 Password = "123456",//密码 HostName = "localhost"//rabbitmq ip }; //建立链接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Queue:{queueName}收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); }; //启动消费者 设置为手动应答消息 channel.BasicConsume(queueName, false, consumer); Console.WriteLine($"Queue:{queueName},消费者已启动"); }
Topic Exchange
全部发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,
Exchange 将路由进行模糊匹配。可使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配很少很多一个词。所以“XiaoChen.#”可以匹配到“XiaoChen.pets.cat”,可是“XiaoChen.” 只会匹配到“XiaoChen.money”。
因此,Topic Exchange 使用很是灵活。
static void Main(string[] args) { string exchangeName = "TestTopicChange"; string queueName = "hello"; string routeKey = "TestRouteKey.*"; //建立链接工厂 ConnectionFactory factory = new ConnectionFactory { UserName = "tangsansan",//用户名 Password = "123456",//密码 HostName = "localhost"//rabbitmq ip }; //建立链接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定义一个Direct类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null); //定义队列1 channel.QueueDeclare(queueName, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($"\nRabbitMQ链接成功,\n\n请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); }
问题:
None of the specified endpoints were reachable
这个异常在建立链接时抛出(CreateConnection()),缘由通常是ConnectionFactory参数设置不对,好比HostName、UserName、Password
未设置VirtualHost的权限
设置方法:RabbitmqWeb管理网站-->Admin