如何优雅的使用RabbitMQ

目录git

RabbitMQ无疑是目前最流行的消息队列之一,对各类语言环境的支持也很丰富,做为一个.NET developer有必要学习和了解这一工具。消息队列的使用场景大概有3种:github

一、系统集成,分布式系统的设计。各类子系统经过消息来对接,这种解决方案也逐步发展成一种架构风格,即“经过消息传递的架构”。数据库

二、当系统中的同步处理方式严重影响了吞吐量,好比日志记录。假如须要记录系统中全部的用户行为日志,若是经过同步的方式记录日志势必会影响系统的响应速度,当咱们将日志消息发送到消息队列,记录日志的子系统就会经过异步的方式去消费日志消息。服务器

三、系统的高可用性,好比电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。若是可以将请求转发到消息队列,再由服务器去消费这些消息将会使得请求变得平稳,提升系统的可用性。架构

若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。异步

1、开始使用RabbitMQasync

RabbitMQ官网提供了详细的安装步骤,另外官网还提供了RabbitMQ在六种场景的使用教程。其中教程一、三、6将覆盖99%的使用场景,因此正常来讲只须要搞清楚这3个教程便可快速上手。分布式

2、简单分析函数

咱们以官方提供的教程1作个简单梳理:该教程展现了Producer如何向一个消息队列(message queue)发送一个消息(message),消息消费者(Consumer)收到该消息后消费该消息。微服务

一、producer端:

 

var factory = new ConnectionFactory() { HostName = "localhost" };
 using (var connection = factory.CreateConnection())
 {
 while (Console.ReadLine() != null)
 {
 using (var channel = connection.CreateModel())
 {
 //建立一个名叫"hello"的消息队列
 channel.QueueDeclare(queue: "hello",
 durable: false,
 exclusive: false,
 autoDelete: false,
 arguments: null);
 
 var message = "Hello World!";
 var body = Encoding.UTF8.GetBytes(message);
 
 //向该消息队列发送消息message
 channel.BasicPublish(exchange: "",
 routingKey: "hello",
 basicProperties: null,
 body: body);
 Console.WriteLine(" [x] Sent {0}", message);
 }
 }
 }

该段代码很是简单,几乎到了没法精简的地步:建立了一个信道(channel)->建立一个队列->向该队列发送消息。

二、Consumer端

 

var factory = new ConnectionFactory() { HostName = "localhost" };
 using (var connection = factory.CreateConnection())
 {
 using (var channel = connection.CreateModel())
 {
 //建立一个名为"hello"的队列,防止producer端没有建立该队列
 channel.QueueDeclare(queue: "hello",
 durable: false,
 exclusive: false,
 autoDelete: false,
 arguments: null);
 
 //回调,当consumer收到消息后会执行该函数
 var consumer = new EventingBasicConsumer(channel);
 consumer.Received += (model, ea) =>
 {
 var body = ea.Body;
 var message = Encoding.UTF8.GetString(body);
 Console.WriteLine(" [x] Received {0}", message);
 };
 
 //消费队列"hello"中的消息
 channel.BasicConsume(queue: "hello",
 noAck: true,
 consumer: consumer);
 
 Console.WriteLine(" Press [enter] to exit.");
 Console.ReadLine();
 }
 }

该段代码能够理解为:建立信道->建立队列->定义回调函数->消费消息。

该实例描述了Send/Receive模式,能够简单理解为1(producer) VS 1(consumer)的场景;

如何优雅的使用RabbitMQ

 

实例3则描述了Publish/Subscriber模式,即1(producer) VS 多个(consumer);

如何优雅的使用RabbitMQ

 

在以上两个示例中,producer只须要发送消息便可,并不关心consumer的返回结果。实例6则描述了一个RPC调用场景,producer发送消息后还要接收consumer的返回结果,这一场景看起来跟使用消息队列的目的有点相悖。由于使用消息队列的目的之一就是要异步,可是这一场景彷佛又将异步变成了同步,不过这一场景也颇有用,好比一个用户操做产生了一个消息,应用服务收到该消息后执行了一些逻辑并使得数据库发生了变化,UI会一直等待应用服务的返回结果才刷新页面。

如何优雅的使用RabbitMQ

 

若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。

3、 发现抽象

我桌子上放着一本RabbitMQ in Action,另外官网提供的文档也很详细,我感受在一个月内我就能精通RabbitMQ,到时候简历上又能够写上“精通…”,感受有点小得意呢... ,可是我知道这并非使用RabbitMQ的最佳方式。

咱们知道合理的抽象能够帮咱们隐藏掉一些技术细节,让咱们将重心放在核心业务上,好比一我的问你:“大雁塔如何走?”你的回答多是“小寨往东,一直走两站,右手边”,若是你回答:“右转45度,向前走100米,再转90度…”,对方就会迷失在这些细节中。

消息队列的使用过程当中实际隐藏着一种抽象——服务总线(Service Bus)。

咱们在回头看第一个例子,这个例子隐含的业务是:ClientA发送一个指令,ClientB收到该指令后作出反应。若是是这样,咱们为何要关心如何建立channel,如何建立一个queue? 我仅仅是要发送一个消息而已。另外这个例子写的其实不够健壮:

没有重试机制:若是ClientB第一次没有执行成功如何对该消息处理?

没有错误处理机制:若是ClientB在重试了N次以后仍是异常如何处理该消息?

没有熔断机制;

如何对ClientA作一个schedule(计划安排),好比定时发送等;

没有消息审计机制;

没法对消息的各个状态作追踪;

事物处理等。

服务总线正是这种场景的抽象,而且为咱们提供了这些机制,让咱们赶快来看个究竟吧。

4、初识MassTransit

MassTransit是.NET平台下的一款开源免费的ESB产品,官网:http://masstransit-project.com/,GitHub 700 star,500 Fork,相似的产品还有NServiceBus,之因此要选用MassTransit是由于他要比NServiceBus轻量级,另外在MassTransit开发之初就选用了RabbitMQ做为消息传输组建;同时我想拿他跟NServiceBus作个比较,看看他们到底有哪些侧重点。

一、新建控制台应用程序:Masstransit.RabbitMQ.GreetingClient

使用MassTransit能够从Nuget中安装:

Install-Package MassTransit.RabbitMQ

二、建立服务总线,发送一个命令

static void Main(string[] args)
{
 Console.WriteLine("Press 'Enter' to send a message.To exit, Ctrl + C");
 
 var bus = BusCreator.CreateBus();
 var sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.GreetingQueue}");
 
 while (Console.ReadLine()!=null)
 {
 Task.Run(() => SendCommand(bus, sendToUri)).Wait();
 }
 
 Console.ReadLine();
}
 
private static async void SendCommand(IBusControl bus,Uri sendToUri)
{
 var endPoint =await bus.GetSendEndpoint(sendToUri);
 var command = new GreetingCommand()
 {
 Id = Guid.NewGuid(),
 DateTime = DateTime.Now
 };
 
 await endPoint.Send(command);
 
 Console.WriteLine($"send command:id={command.Id},{command.DateTime}"); 
}

这一段代码隐藏了众多关于消息队列的细节,将咱们的注意力集中在发送消息上,同时ServiceBus提供的API也更接近业务,咱们虽然发送的是一个消息,可是在这种场景下体现出来是一个命令,Send(command)这一API描述了咱们的意图。

若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。

三、服务端接收这一命令

新建一个命令台控制程序:Masstransit.RabbitMQ.GreetingServer

 

var bus = BusCreator.CreateBus((cfg, host) =>
{
 cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingQueue, e =>
 {
 e.Consumer<GreetingConsumer>();
 
 });
});

这一代码能够理解为服务端在监听消息,咱们在服务端注册了一个名为“GreetingConsumer”的消费者,GreetingConsumer的定义:

 

public class GreetingConsumer :IConsumer<GreetingCommand>
{
 public async Task Consume(ConsumeContext<GreetingCommand> context)
 {
 
 await Console.Out.WriteLineAsync($"receive greeting commmand: {context.Message.Id},{context.Message.DateTime}");
 }
}

该consumer能够消费类型为GreetingCommand的消息。这一实例几乎隐藏了有关RabbitMQ的技术细节,将代码中心放在了业务中,将这两个控制台应用跑起来试试:

如何优雅的使用RabbitMQ

 

5、实现Publish/Subscribe模式

发布/订阅模式使得基于消息传递的软件架构成为可能,这一能力表现为ClientA发送消息X,ClientB和ClientC均可以订阅消息X。

一、咱们在上面的例子中改造一下,当GreetingConsumer收到GreetingCommand后发送一个GreetingEvent:

 

var greetingEvent = new GreetingEvent()
 {
 Id = context.Message.Id,
 DateTime = DateTime.Now
 };
 
 await context.Publish(greetingEvent);

二、新建控制台程序Masstransit.RabbitMQ.GreetingEvent.SubscriberA用来订阅GreetingEvent消息:

 

var bus = BusCreator.CreateBus((cfg, host) =>
 {
 cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingEventSubscriberAQueue, e =>
 {
 e.Consumer<GreetingEventConsumer>();
 });
 });
 
 bus.Start();

定义GreetingEventConsumer:

若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。

public class GreetingEventConsumer:IConsumer<Greeting.Message.GreetingEvent>
 {
 public async Task Consume(ConsumeContext<Greeting.Message.GreetingEvent> context)
 {
 await Console.Out.WriteLineAsync($"receive greeting event: id {context.Message.Id}");
 }
 }

这一代码跟Masstransit.RabbitMQ.GreetingServer接受一个命令几乎如出一辙,惟一的区别在于:

在Send/Receive模式中Client首先要得到对方(Server)的终结点(endpoint),直接向该终结点发送命令。Server方监听本身的终结点并消费命令。

而Publish/Subscribe模式中Client publish一个事件,SubscriberA在本身的终结点(endpointA)监听事件,SubscriberB在本身的终结点(endpointB)监听事件。

三、根据上面的分析再定义一个Masstransit.RabbitMQ.GreetingEvent.SubscriberB

四、将4个控制台应用程序跑起来看看

如何优雅的使用RabbitMQ

 

6、实现RPC模式

这一模式在Masstransit中被称做Request/Response模式,经过IRequestClient<IRequest, IResponse> 接口来实现相关操做。一个相关的例子在官方的github。

结束语:本篇文章分析了如何使用Masstransit来抽象业务,避免直接使用具体的消息队列,固然本文提到的众多服务总线机制,如“重试、熔断等”并无在该文中出现,须要你们进一步去了解该项目。

若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。

经过对Masstransit的一些试用和NServiceBus的对比,Masstransit在实际项目中很容易上手而且免费,各类API定义的也很是清晰,可是官方的文档有点过于简单,实际使用中还须要去作深刻的研究。做为.NET平台下为数很少的ESB开源产品,其关注程度仍是不够,期待你们为开源项目作出贡献。

相关文章
相关标签/搜索