RabbitMQ能够被比做一个邮局,当你向邮局寄一封信时,邮局会保证将这封信送达你写的收件人,而RabbitMQ与邮局最主要的区别是,RabbitMQ并不真的处理信件,它处理的是二进制的数据块,它除了接收“信件”,他还要负责处理数据(好比保存数据)、转发数据(实际上送信是邮递员的职责)等。docker
在RabbitMQ中有如下几个概念:异步
一个生产者仅仅负责向RabbitMQ中发送消息,除此以外它不会完成其余工做。函数
队列由RabbitMQ自身提供,它的做用就像是邮箱和邮局的关系同样。虽然看似RabbitMQ和你的应用直接通讯,可是实际上消息是被存储到队列中的,实际上,队列就是一个消息缓冲区,受宿主环境内存和磁盘空间的限制。多个生产者能够将消息发送到一个队列,而多个消费者能够从一个队列中接收消息。测试
一个消费者仅仅负责接收来自生产者的消息。spa
这里须要注意如下两点:.net
咱们须要创建两个.net core项目,分别是发布端、接收端,它们由消息队列进行链接,消息队列会保存消息,直到消息被发送给接收端,如图所示。3d
建立项目后,两个项目均需安装RabbitMQ.Client的Nuget包,并restore。rest
在正式开始编写发送端代码以前,咱们须要安装RabbitMQ,这里我在Docker上安装RabbitMQ,所用系统为CentOS7。
首先创新新的文件夹目录/docker/rabbitmq/data,用做RabbitMQ容器的数据卷挂载。code
而后运行下面的语句,若是本地没有镜像,Docker会自动去Dockerhub上拉取镜像:blog
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq --volume /rabbitmq/data:/var/lib/rabbitmq rabbitmq:management
镜像下载完毕后能够看到容器已经运行,其中,15671用于外网访问,5672用于内网链接。
访问http://ip:15671,不出意外你能够进入到RabbitMQ的可视化管理界面。
下面能够开始编写生产者端的代码,以下所示。
using System; using System.Text; using RabbitMQ.Client; namespace Send { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "148.70.210.208" }; //链接RabbitMQ using (var connection = factory.CreateConnection()) { //建立一个信道 using (var channel = connection.CreateModel()) { //声明一个Hello队列,信道就与Hello队列链接起来了 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!"; //消息体 var body = Encoding.UTF8.GetBytes(message); //经过信道,将消息发送到队列 //须要指明队列的名称,才能转发到特定队列 channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent [0]", message); } Console.WriteLine("Press [enter] to exit."); Console.ReadLine(); } } } }
程序首先经过Socket与RabbitMQ进行了链接,链接成功后,建立了一个信道,并经过这个信道声明了一个名叫"hello"的队列,这样,队列就和这个信道绑定起来,在发送数据时,只须要在routingKey中指明所用的队列名,消息就会经过direct exchange的模式被发送到指明名称的队列中,这就好像你在网上买了商品,而后你告诉卖家,必定要发某个快递同样,这样,你的商品就会经过这个快递公司邮寄给你。
direct exchange模式如图所示。
整个流程能够以下进行归纳,首先创建发布端和RabbitMQ进行Socket链接,而且声明Socket链接中的信道,经过这个信道声明特定名称的队列,这样,数据就能够经过这个信道发送到RabbitMQ的队列中,等待被转发到消费端,这与官网给出的流程图一模一样。
下面咱们就能够开始编写消费端的程序,对于消费者端来讲,它一直在进行监听,咱们须要一直运行咱们的监听程序,一旦收到消息,就把消息经过控制台进行打印。
打开咱们的Recieve项目,代码以下所示。
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Receive { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "your ip address" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"Received message:{message}"); }; channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine("Press [enter] to exit"); Console.ReadLine(); } } } } }
一开始的代码与发送端一致,创建链接、建立信道、声明队列,须要注意这里声明的队列名须要与发送端绑定的队列名一致。这里之因此从新声明队列,是为了保证当接收端程序比发送端先启动时,队列依然是存在的,可是须要保证名称同样。
以上准备工做完成后,接收端能够开始异步的接收队列中的数据了。咱们使用EventBasicConsumer.Received事件来绑定接收完成的回调函数。即:当消费端成功从"hello"队列中接收到数据后,就会调用这个匿名的lambda表达式,从消息体中获取数据,将数据转码成字符串格式而且打印到控制台中,至此,一条消息的处理就完毕了。
须要注意,在消费端收到信息后,须要给客户端确认,即返回autoAck,若是没有返回,客户端会默认消费者没有收到,消息会存储在队列中,直到被消费。这里设置autoAck:true,则表示消费者一旦收到消息,则马上自动的向客户端回应,这样,消息就会从队列中删除。
运行代码能够看到,不管先运行消费端仍是生产者端的代码,消费端都能收到消息:
若是先运行生产者端程序,消息会存储在消息队列中,当运行消费者端程序后,保存在消息队列中的消息将会被消费,进而从队列中删除。
例如,咱们先不运行消费者端程序,只运行生产者端,这是咱们刷新RabbitMQ的可视化界面。
能够看到有一条消息处于Ready状态,可是还未被消费,因此被存储,咱们关闭生产者端程序,再次运行,能够看到队列中未被消费的消息变为两条。
这时,咱们运行消费者端程序,运行成功后能够看到,消费者端一次收到了两条消息,而且队列中的消息被清除。
因而可知,对于RabbitMQ的消息队列,生产者和消费者端是彻底分离的。 下一讲咱们将继续探索RabbitMQ的工做队列。