小编是菜鸟一枚,最近想试试MQ相关的技术,因此本身看了下RabbitMQ官网,试着写下本身的理解与操做的过程。html
刚开始的第一篇,原理只介绍 生产者、消费者、队列,至于其余的内容,会在后续中陆续补齐。windows
可能不少人有疑惑:MQ究竟是什么?哪些场景下要使用MQ?
前段时间安装了RabbitMQ,如今就记录下本身的学习心得吧。
首先看段程序:数组
class Program { static void Main(string[] args) { new Thread(Write).Start(); new Thread(Write).Start(); new Thread(Write).Start(); new Thread(Write).Start(); } public static void WriteLog(int i) { using (FileStream f = new FileStream(@"d:\\test.txt", FileMode.Append)) { using (StreamWriter sw = new StreamWriter(f, Encoding.Default)) { sw.Write(i); } } } public static void Write() { for (int i = 0; i < 10000; i++) { WriteLog(i); } } }
仅仅从代码上看,没有以为任何问题对吧?编译也是经过的,可是执行时,出现一个问题:服务器
固然,这仅仅是一个小的案例,相似这种多线程写文件形成的问题, 就应该使用MQ了。多线程
MQ的使用场景大概包括解耦,提升峰值处理能力,送达和排序保证,缓冲等。socket
消息队列技术是分布式应用间交换信息的一种技术。分布式
消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走。ide
经过消息队列,应用程序可独立地执行--它们不须要知道彼此的位置、或在继续执行前不须要等待接收程序接收此消息。post
MQ主要做用是接受和转发消息。你能够想一想在生活中的一种场景:当你把信件的投进邮筒,邮递员确定最终会将信件送给收件人。咱们能够把MQ比做 邮局和邮递员。学习
MQ和邮局的主要区别是,它不处理消息,可是,它会接受数据、存储消息数据、转发消息。
生产者:
消息发送者,在MQ中被称为生产者(producer),一个发送消息的应用也被叫作生产者,用P表示
生产者“生产”出消息后,最终由谁消费呢?等待接受消息的应用程序,咱们称之为消费者(Consuming ),用C表示
消息只能存储在队列(queue )中。尽管消息在rabbitMQ和应用程序间流通,可是队列倒是存在于RabbitMQ内部。
一个队列不受任何限制,它能够存储你想要存储的消息量,它本质上是一个无限的缓冲区。
多个生产者能够向同一个队列发送消息,多个消费者能够尝试从同一个消息队列中接收数据。
一个队列像下面这样(上面是它的队列名称)
生产者、消费者、中间件没必要在一台机器上,实际应用中也是绝大多数不在一块儿的。咱们能够用一张图表示RabbitMQ的构造:
注:此图片摘自于百度百科RabbitMQ。
多线程写入,产生消息的也就是一个程序(一个生产者P),消费消息的也是一个消息,它的模型应该是:
程序包管理控制台命令:
PM> Install-Package RabbitMQ.Client
首先,建立一个 connection 经过socket链接 去和服务器链接起来(须要传目的服务器的IP、用户名、密码等)。
接着 建立一个 channel ,这是大部分的要作的事情所在。
要发送消息,咱们必须声明一个队列,,而后咱们能够向队列发布消息。
执行一次BasicPublish方法,推送一个消息。
class Program { static void Main(string[] args) { new Thread(Write).Start(); new Thread(Write).Start(); new Thread(Write).Start(); } public static void Write() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null); for (int i = 0; i < 8000; i++) { string message = i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body); Console.WriteLine("Program Sent {0}", message); } } } }
声明的队列,在服务器中若是不存在了,会自动建立。而消息的内容是字节数组,在使用时,注意编码问题。
当队列里有消息时,消费者要随时可以从队列里获取消息,因此我须要一直运行它,让它监听消息。
就像咱们打篮球进行传球,须要事先确认要传给的那个队友位置同样,生产者要发送消息,必定要事先知道消费消息的程序的对列是哪一个。因此,在运行生产者程序前,须要先启动消费者程序。
由此,声明对列,就应该在消费者程序中完成。
class Program { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost ="/"}; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); ExcuateWriteFile(message); Console.WriteLine(" Receiver Received {0}", message); }; channel.BasicConsume(queue: "writeLog", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } public static void ExcuateWriteFile(string i) { using (FileStream fs = new FileStream(@"d:\\test.txt", FileMode.Append)) { using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode)) { sw.Write(i); } } } }
先执行 消费者程序,让它一直保持监听。
执行时VS报错:
“RabbitMQ.Client.Exceptions.BrokerUnreachableException”类型的未经处理的异常在 RabbitMQ.Client.dll 中发生 其余信息: None of the specified endpoints were reachable。
进入查看详细的内部异常:
innerEception:{"The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=\"NOT_ALLOWED - access to vhost '/' refused for user 'eric'\", classId=10, methodId=40, cause="}
此时,咱们打开在http://localhost:15672/#/users 能够看到eric 下 的Can access virtual hosts 为 NoAccess
解决办法:
rabbitmqctl控制台输入
rabbitmqctl set_permissions -p / userName "." "." ".*"
再次执行时,能够看到:
而后运行 生产者程序。
咱们先开着 Receive ,当生产者运行时
消费者的自动触发执行 :
直到全部的 指定的 queue 里面的消息彻底消费完为止。(此时消费者程序仍然在监听中)
对于须要安装和设置用户的同窗,请参考 windows下 安装 rabbitMQ 及操做经常使用命令
本文参考: