消息队列做为分布式系统中的重要组件,经常使用的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ。至于各类消息队列的优缺点比较,在这里就不作扩展了,网上资源不少。html
更多内容可参考 消息队列及常见消息队列介绍。我在这里选用的是RabbitMq。git
官网地址:http://www.rabbitmq.comgithub
安装和配置:Windows下RabbitMq安装及配置数据库
RabbitMQ是一款基于AMQP(高级消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、 Erlang等。在RabbitMq中首先要弄清楚的概念是 交换机、队列、绑定。基本的消息通信步骤就是首先定义ExChange,而后定义队列,而后绑定交换机和队列。json
须要明确的一点儿是,发布者在发送消息是,并非把消息直接发送到队列中,而是发送到Exchang,而后由交互机根据定义的消息匹配规则,在将消息发送到队列中。api
Exchange有四种消息消息分发规则:direct,topic,fanout,header。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到了。app
详细的概念介绍推荐查看:消息队列之RabbitMq框架
Easynetq是一个简单易用的Rabbitmq Net客户端。同时支持 NetFramework和NetCore。GitHub地址。它是针对RabbitMq Net客户端的进一步封装。关于EasyNetQ的简单使用推荐教程:EasyNetQ的介绍。异步
本文主要介绍基于EasyNeq的高级API的使用。EasyNetQ的做者在核心的IBus接口中尽可能避免暴露AMQP中的交换机、队列、绑定这些概念,使用者即便不去了解这些概念,也能完成消息的发送接收。这至关简洁,但某些状况下,基于应用场景的须要,咱们须要自定义交换机、队列、绑定这些信息,EasyNetQ容许你这么作,这些都是经过IAdvanceBus接口实现。async
这里为了演示,首先新建一个项目,包括一个发布者,两个接收者,一个公共的类库
安装EasyNetQ: NuGet>Install-Package EasyNetQ
在Common项目里面是针对Easynetq的使用封装,主要目录以下
在RabbitMq文件夹下,是针对消息发送接收的简单封装。
首先来看下RabbitMqManage,主要的发送和订阅操做都在这个类中。其中ISend接口定义了发送消息的规范,SendMessageManage是ISend的实现。IMessageConsume接口定义订阅规范。
MesArg 和PushMsg分别是订阅和发送需用到的参数类。RabbitMQManage是暴露在外的操做类。
首先看发送的代码
public enum SendEnum { 订阅模式 = 1, 推送模式 = 2, 主题路由模式 = 3 } public class PushMsg { /// <summary> /// 发送的数据 /// </summary> public object sendMsg { get; set; } /// <summary> /// 消息推送的模式 /// 如今支持:订阅模式,推送模式,主题路由模式 /// </summary> public SendEnum sendEnum { get; set; } /// <summary> /// 管道名称 /// </summary> public string exchangeName { get; set; } /// <summary> /// 路由名称 /// </summary> public string routeName { get; set; } } internal interface ISend { Task SendMsgAsync(PushMsg pushMsg, IBus bus); void SendMsg(PushMsg pushMsg, IBus bus); } internal class SendMessageMange : ISend { public async Task SendMsgAsync(PushMsg pushMsg, IBus bus) { //一对一推送 var message = new Message<object>(pushMsg.sendMsg); IExchange ex = null; //判断推送模式 if (pushMsg.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct); } if (pushMsg.sendEnum == SendEnum.订阅模式) { //广播订阅模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout); } if (pushMsg.sendEnum == SendEnum.主题路由模式) { //主题路由模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic); } await bus.Advanced.PublishAsync(ex, pushMsg.routeName.ToSafeString(""), false, message) .ContinueWith(task => { if (!task.IsCompleted && task.IsFaulted)//消息投递失败 { //记录投递失败的消息信息 } }); } public void SendMsg(PushMsg pushMsg, IBus bus) { //一对一推送 var message = new Message<object>(pushMsg.sendMsg); IExchange ex = null; //判断推送模式 if (pushMsg.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct); } if (pushMsg.sendEnum == SendEnum.订阅模式) { //广播订阅模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout); } if (pushMsg.sendEnum == SendEnum.主题路由模式) { //主题路由模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic); } bus.Advanced.Publish(ex, pushMsg.routeName.ToSafeString(""), false, message); } }
在EasyNetQ中对于异步发送消息的时候,消息是否送达Broker只须要查看异步发送方法最终执行成功仍是失败,成功就表示消息送达,若是失败能够将失败后的消息存入数据库中,而后用后台线程轮询
数据库表,将失败后的消息进行从新 发送。这种方式还能够进一步变成消息表,就是先将要发送的消息存入消息表中,而后后台线程轮询消息表来进行消息发送。通常这种方式被普遍用于分布式事务中,
将本地数据库操做和消息表写入放入同一个本地事务中,来保证消息发送和本地数据操做的同步成功,由于个人系统中,分布式事务的涉及不多,因此就没这样去作,只是简单的在异步发送的时候监控下
是否发送失败,而后针对失败的消息作一个从新发送的机制。这里,推荐大佬的NetCore分布式事务解决方案 CAP GitHub地址。
接着看一下消息订阅接收涉及的代码
public class MesArgs { /// <summary> /// 消息推送的模式 /// 如今支持:订阅模式,推送模式,主题路由模式 /// </summary> public SendEnum sendEnum { get; set; } /// <summary> /// 管道名称 /// </summary> public string exchangeName { get; set; } /// <summary> /// 对列名称 /// </summary> public string rabbitQueeName { get; set; } /// <summary> /// 路由名称 /// </summary> public string routeName { get; set; } } public interface IMessageConsume { void Consume(string message); }
在订阅中我定义了一个接口,最终业务代码中,全部的消息订阅类,都须要继续此接口
最后,咱们来看下对外使用的操做类
public class RabbitMQManage { private volatile static IBus bus = null; private static readonly object lockHelper = new object(); /// <summary> /// 建立服务总线 /// </summary> /// <param name="config"></param> /// <returns></returns> public static IBus CreateEventBus() { //获取RabbitMq的链接地址 //SystemJsonConfigManage 是我简单封装的一个json操做类,用于针对json文件的读写操做 var config = SystemJsonConfigManage.GetInstance().AppSettings["MeessageService"]; if (string.IsNullOrEmpty(config)) throw new Exception("消息地址未配置"); if (bus == null && !string.IsNullOrEmpty(config)) { lock (lockHelper) { if (bus == null) bus = RabbitHutch.CreateBus(config); } } return bus; } /// <summary> /// 释放服务总线 /// </summary> public static void DisposeBus() { bus?.Dispose(); } /// <summary> /// 消息同步投递 /// </summary> /// <param name="pushMsg"></param> /// <returns></returns> public static bool PushMessage(PushMsg pushMsg) { bool b = true; try { if (bus == null) CreateEventBus(); new SendMessageMange().SendMsg(pushMsg, bus); b = true; } catch (Exception ex) { b = false; } return b; } /// <summary> /// 消息异步投递 /// </summary> /// <param name="pushMsg"></param> public static async Task PushMessageAsync(PushMsg pushMsg) { try { if (bus == null) CreateEventBus(); await new SendMessageMange().SendMsgAsync(pushMsg, bus); } catch (Exception ex) { throw ex; } } /// <summary> /// 消息订阅 /// </summary> public static void Subscribe<TConsum>(MesArgs args) where TConsum : IMessageConsume,new() { if (bus == null) CreateEventBus(); if (string.IsNullOrEmpty(args.exchangeName)) return; Expression<Action<TConsum>> methodCall; IExchange ex = null; //判断推送模式 if (args.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Direct); } if (args.sendEnum == SendEnum.订阅模式) { //广播订阅模式 ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Fanout); } if (args.sendEnum == SendEnum.主题路由模式) { //主题路由模式 ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Topic); } IQueue qu; if (string.IsNullOrEmpty(args.rabbitQueeName)) { qu = bus.Advanced.QueueDeclare(); } else qu = bus.Advanced.QueueDeclare(args.rabbitQueeName); bus.Advanced.Bind(ex, qu, args.routeName.ToSafeString("")); bus.Advanced.Consume(qu, (body, properties, info) => Task.Factory.StartNew(() => { try { lock (lockHelper) { var message = Encoding.UTF8.GetString(body); //处理消息 methodCall = job => job.Consume(message); methodCall.Compile()(new TConsum()); } } catch (Exception e) { throw e; } })); } }
这里面主要封装了消息的发送和订阅,以及IBus单例的建立。在后续的消息发送和订阅主要就经过此处来实现。咱们看到一开始的类目结构中还有一个RaExMessageHandleJob类,这个类就是一个后台
循环任务,用来监测数据库中是否保存了发送失败的消息,若是有,则将消息取出,尝试从新发送。在此就不作多的介绍,你们能够根据本身的实际需求来实现。
如今来看一下消息发布者的代码
主要的发送代码都在Send类中,其中appsettings.json里面配置了Rabbitmq的链接地址,TestDto只是一个为了方便演示的参数类。
下面看一下Program里面的代码
很简单的一个发送消息调用。
而后来看一下Send类中的代码
public class Send { /// <summary> /// 发送消息 /// </summary> public static void SendMessage() { //须要注意一点儿,若是发送的时候,在该管道下找不到相匹配的队列框架将默认丢弃该消息 //推送模式 //推送模式下,需指定管道名称和路由键值名称 //消息只会被发送到和指定路由键值彻底匹配的队列中 var directdto = new PushMsg() { sendMsg = new TestDto() { Var1 = "这是推送模式" }, exchangeName = "message.directdemo", routeName= "routekey", sendEnum =SendEnum.推送模式 }; //同步发送 ,返回true或fasle true 发送成功,消息已存储到Rabbitmq中,false表示发送失败 var b= RabbitMQManage.PushMessage(directdto); //异步发送,若是失败,失败的消息会被写入数据库,会有后台线程轮询数据库进行从新发送 //RabbitMQManage.PushMessageAsync(directlist); //订阅模式 //订阅模式只须要指定管道名称 //消息会被发送到该管道下的全部队列中 var fanoutdto = new PushMsg() { sendMsg = new TestDto() { Var1 = "这是订阅模式" }, exchangeName = "message.fanoutdemo", sendEnum = SendEnum.订阅模式 }; //同步发送 var fb = RabbitMQManage.PushMessage(fanoutdto); //异步发送 //RabbitMQManage.PushMessageAsync(fanoutdto); //主题路由模式 //路由模式下需指定 管道名称和路由值 //消息会被发送到该管道下,和路由值匹配的队列中去 var routedto = new PushMsg() { sendMsg = new TestDto() { Var1 = "这是主题路由模式1", }, exchangeName = "message.topicdemo", routeName="a.log", sendEnum=SendEnum.主题路由模式 }; var routedto2 = new PushMsg() { sendMsg = new TestDto() { Var1 = "这是主题路由模式2", }, exchangeName = "message.topicdemo", routeName = "a.log.a.b", sendEnum = SendEnum.主题路由模式 }; //同步发送 var rb = RabbitMQManage.PushMessage(routedto); var rb2 = RabbitMQManage.PushMessage(routedto2); //异步发送 //RabbitMQManage.PushMessageAsync(routedto); } }
首先来看下消费者端的目录结构
其中appsettings.json中配置Rabbitmq的链接信息,Program中只是简单调用消息订阅
主要的消息订阅代码都在MessageManage文件夹下,MessageManService用于定义消息订阅类型
public class MessageManService { public static void Subsribe() { Task.Run(() => { //概念 一个管道下面能够绑定多个队列。 //发送消息 是指将消息发送到管道中,而后由rabbitmq根据发送规则在将消息具体的转发到对应到管道下面的队列中 //消费消息 是指消费者(即服务)从管道下面的队列中获取消息 //同一个队列 能够有多个消费者(即不一样的服务,均可以链接到同一个队列去获取消息) //但注意 当一个队列有多个消费者的时候,消息会被依次分发到不一样的消费者中。好比第一条消息给第一个消费者,第二条消息给第二个消费者(框架内部有一个公平分发的机制) //推送模式时 需指定管道名称和路由值 //队列名称可本身指定 //注意 ,管道名称和路由名称必定要和发送方的管道名称和路由名称一致 //不管这个管道下面挂靠有多少个队列,只有路由名称和此处指定的路由名称彻底一致的队列,才会收到这条消息。 var dirarg = new MesArgs() { sendEnum = SendEnum.推送模式, exchangeName = "message.directdemo", rabbitQueeName = "meesage.directmessagequene", routeName = "routekey" }; RabbitMQManage.Subscribe<DirectMessageConsume>(dirarg); //订阅模式时需指定管道名称,而且管道名称要和发送方管道名称一致 //队列名称可本身指定 //全部这个管道下面的队列,都将收到该条消息 var fanoutrg = new MesArgs() { sendEnum = SendEnum.订阅模式, exchangeName = "message.fanoutdemo", rabbitQueeName = "meesage.fanoutmessagequene" }; RabbitMQManage.Subscribe<FanoutMessageConsume>(fanoutrg); //路由模式时需指定管道名称,路由关键字而且管道名称,路由关键字要和发送方的一致 //队列名称可本身指定 //消息将被发送到管道下面的能匹配路由关键字的队列中 //也就是说 路由模式时,有多少队列能收到消息,取决于该队列的路由关键字是否匹配,只要匹配就能收到消息 //符号“#”匹配一个或多个词,符号“*”匹配很少很多一个词 var topicrg = new MesArgs() { sendEnum = SendEnum.主题路由模式, exchangeName = "message.topicdemo", rabbitQueeName = "message.topicmessagequene", routeName = "#.log.#" }; RabbitMQManage.Subscribe<TopicMessageConsume>(topicrg); }); } }
Consume文件夹下主要定义了消息的业务处理
//推送模式过来的消息 public class DirectMessageConsume : IMessageConsume { //消息的处理方法中最好不要进行try catch操做 //若是发送异常,EasyNetQ会自动将消息放入错误队列中 //若是在Consume方法体中捕获了异常而且没有抛出,会默认消息处理成功 //消息的幂等性需业务方自行处理,也就是说同一条消息可能会接收到两次 //(好比说第一次正在处理消息的时候服务挂掉,服务重启后这条消息又会从新推送过来) public void Consume(string message) { var dto = JsonConvert.DeserializeObject<TestDto>(message); Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3); } } //广播模式过来的消息 public class FanoutMessageConsume : IMessageConsume { //消息的处理方法中最好不要进行try catch操做 //若是发送异常,EasyNetQ会自动将消息放入错误队列中 //若是在Consume方法体中捕获了异常而且没有抛出,会默认消息处理成功 //消息的幂等性需业务方自行处理,也就是说同一条消息可能会接收到两次 //(好比说第一次正在处理消息的时候服务挂掉,服务重启后这条消息又会从新推送过来) public void Consume(string message) { var dto = JsonConvert.DeserializeObject<TestDto>(message); Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3); } } //主题路由模式过来的消息 public class TopicMessageConsume : IMessageConsume { //消息的处理方法中最好不要进行try catch操做 //若是发送异常,EasyNetQ会自动将消息放入错误队列中 //若是在Consume方法体中捕获了异常而且没有抛出,会默认消息处理成功 //消息的幂等性需业务方自行处理,也就是说同一条消息可能会接收到两次 //(好比说第一次正在处理消息的时候服务挂掉,服务重启后这条消息又会从新推送过来) public void Consume(string message) { var dto = JsonConvert.DeserializeObject<TestDto>(message); Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3); } }
能够看到,全部的类都集成自咱们定义的接口IMessageConsume。
在EasyNetQ中若是须要消费者确认功能,则须要在Rabbitmq的链接配置中设置publisherConfirms=true,这将会开启自动确认。在使用高级api定义交换机和队列时能够本身定义多种参数,好比消息是否持久化,消息最大长度等等,具体你们能够去看官方文档,上面有详细介绍。Easynetq会自动去捕获消费异常的消息并将其放入到错误队列中,并且官方提供了从新发送错误队列中消息的方法,固然你也能够本身去监视错误列队,对异常消息进行处理。EasyNetQ里面做者针对消息的发布确认和消费确认都作了封装。在EasyNetQ中发布消息的时候若是选用的同步发送,只要没有抛出异常,咱们就能够认为任务消息已经正确到达Broker,而异步发送的话须要咱们本身去监视Task是否成功 。若是开启了自动确认,并不须要咱们在消息处理的方法体中手动返回ack信息,只要消息被 正确处理就会自动ack。虽然RabbitMq中也有事务消息,但因为性能比较差,并不推荐使用。其实,只要咱们能明确消息是否发布成功和消费成功,就将会很容易在这个基础上扩展出分布式事务的处理。