本示例经过对服务订阅的封装、隐藏细节实现、统一配置、自动重连、异常处理等各个方面来打造一个简单易用的 RabbitMQ 工厂;本文适合适合有必定 RabbitMQ 使用经验的读者阅读,若是你尚未实际使用过 RabbitMQ,也没有关系,由于本文的代码都是基于直接运行的实例,经过简单的修改 RabbitMQ 便可运行。git
首先,建立一个 .netcore 控制台项目,建立 Helper、Service、Utils 文件夹,分别用于存放通道管理、服务订阅、公共组件。github
public class MQConfig { /// <summary> /// 访问消息队列的用户名 /// </summary> public string UserName { get; set; } /// <summary> /// 访问消息队列的密码 /// </summary> public string Password { get; set; } /// <summary> /// 消息队列的主机地址 /// </summary> public string HostName { get; set; } /// <summary> /// 消息队列的主机开放的端口 /// </summary> public int Port { get; set; } }
public class MessageBody { public EventingBasicConsumer Consumer { get; set; } public BasicDeliverEventArgs BasicDeliver { get; set; } /// <summary> /// 0成功 /// </summary> public int Code { get; set; } public string Content { get; set; } public string ErrorMessage { get; set; } public bool Error { get; set; } public Exception Exception { get; set; } }
public class MQChannel { public string ExchangeTypeName { get; set; } public string ExchangeName { get; set; } public string QueueName { get; set; } public string RoutekeyName { get; set; } public IConnection Connection { get; set; } public EventingBasicConsumer Consumer { get; set; } /// <summary> /// 外部订阅消费者通知委托 /// </summary> public Action<MessageBody> OnReceivedCallback { get; set; } public MQChannel(string exchangeType, string exchange, string queue, string routekey) { this.ExchangeTypeName = exchangeType; this.ExchangeName = exchange; this.QueueName = queue; this.RoutekeyName = routekey; } /// <summary> /// 向当前队列发送消息 /// </summary> /// <param name="content"></param> public void Publish(string content) { byte[] body = MQConnection.UTF8.GetBytes(content); IBasicProperties prop = new BasicProperties(); prop.DeliveryMode = 1; Consumer.Model.BasicPublish(this.ExchangeName, this.RoutekeyName, false, prop, body); } internal void Receive(object sender, BasicDeliverEventArgs e) { MessageBody body = new MessageBody(); try { string content = MQConnection.UTF8.GetString(e.Body); body.Content = content; body.Consumer = (EventingBasicConsumer)sender; body.BasicDeliver = e; } catch (Exception ex) { body.ErrorMessage = $"订阅-出错{ex.Message}"; body.Exception = ex; body.Error = true; body.Code = 500; } OnReceivedCallback?.Invoke(body); } /// <summary> /// 设置消息处理完成标志 /// </summary> /// <param name="consumer"></param> /// <param name="deliveryTag"></param> /// <param name="multiple"></param> public void SetBasicAck(EventingBasicConsumer consumer, ulong deliveryTag, bool multiple) { consumer.Model.BasicAck(deliveryTag, multiple); } /// <summary> /// 关闭消息队列的链接 /// </summary> public void Stop() { if (this.Connection != null && this.Connection.IsOpen) { this.Connection.Close(); this.Connection.Dispose(); } } }
首先是在构造函数内对当前通道的属性进行设置,其次提供了 Publish 和 OnReceivedCallback 的委托,当通道接收到消息的时候,会进入方法 Receive 中,在 Receive 中,通过封装成 MessageBody 对象,并调用委托 OnReceivedCallback ,将,解析好的消息传递到外边订阅者的业务中。最终在 MQChannel 中还提供了消息确认的操做方法 SetBasicAck,供业务系统手动调用。web
public class MQChannelManager { public MQConnection MQConn { get; set; } public MQChannelManager(MQConnection conn) { this.MQConn = conn; } /// <summary> /// 建立消息通道 /// </summary> /// <param name="cfg"></param> public MQChannel CreateReceiveChannel(string exchangeType, string exchange, string queue, string routekey) { IModel model = this.CreateModel(exchangeType, exchange, queue, routekey); model.BasicQos(0, 1, false); EventingBasicConsumer consumer = this.CreateConsumer(model, queue); MQChannel channel = new MQChannel(exchangeType, exchange, queue, routekey) { Connection = this.MQConn.Connection, Consumer = consumer }; consumer.Received += channel.Receive; return channel; } /// <summary> /// 建立一个通道,包含交换机/队列/路由,并创建绑定关系 /// </summary> /// <param name="type">交换机类型</param> /// <param name="exchange">交换机名称</param> /// <param name="queue">队列名称</param> /// <param name="routeKey">路由名称</param> /// <returns></returns> private IModel CreateModel(string type, string exchange, string queue, string routeKey, IDictionary<string, object> arguments = null) { type = string.IsNullOrEmpty(type) ? "default" : type; IModel model = this.MQConn.Connection.CreateModel(); model.BasicQos(0, 1, false); model.QueueDeclare(queue, true, false, false, arguments); model.QueueBind(queue, exchange, routeKey); return model; } /// <summary> /// 接收消息到队列中 /// </summary> /// <param name="model">消息通道</param> /// <param name="queue">队列名称</param> /// <param name="callback">订阅消息的回调事件</param> /// <returns></returns> private EventingBasicConsumer CreateConsumer(IModel model, string queue) { EventingBasicConsumer consumer = new EventingBasicConsumer(model); model.BasicConsume(queue, false, consumer); return consumer; } }
public MQChannelManager(MQConnection conn) { this.MQConn = conn; }
public class MQConnection { private string vhost = string.Empty; private IConnection connection = null; private MQConfig config = null; /// <summary> /// 构造无 utf8 标记的编码转换器 /// </summary> public static UTF8Encoding UTF8 { get; set; } = new UTF8Encoding(false); public MQConnection(MQConfig config, string vhost) { this.config = config; this.vhost = vhost; } public IConnection Connection { get { if (connection == null) { ConnectionFactory factory = new ConnectionFactory { AutomaticRecoveryEnabled = true, UserName = this.config.UserName, Password = this.config.Password, HostName = this.config.HostName, VirtualHost = this.vhost, Port = this.config.Port }; connection = factory.CreateConnection(); } return connection; } } }
设想一下,有这样的一个业务场景,通道管理和服务管理都是相同的操做,若是这些基础操做都在一个地方定义,且有一个默认的实现,那么后来者就不须要去关注这些技术细节,直接继承基础类后,传入相应的消息配置便可完成
消息订阅和发布操做。网络
public interface IService { /// <summary> /// 建立通道 /// </summary> /// <param name="queue">队列名称</param> /// <param name="routeKey">路由名称</param> /// <param name="exchangeType">交换机类型</param> /// <returns></returns> MQChannel CreateChannel(string queue, string routeKey, string exchangeType); /// <summary> /// 开启订阅 /// </summary> void Start(); /// <summary> /// 中止订阅 /// </summary> void Stop(); /// <summary> /// 通道列表 /// </summary> List<MQChannel> Channels { get; set; } /// <summary> /// 消息队列中定义的虚拟机 /// </summary> string vHost { get; } /// <summary> /// 消息队列中定义的交换机 /// </summary> string Exchange { get; } }
public abstract class MQServiceBase : IService { internal bool started = false; internal MQServiceBase(MQConfig config) { this.Config = config; } public MQChannel CreateChannel(string queue, string routeKey, string exchangeType) { MQConnection conn = new MQConnection(this.Config, this.vHost); MQChannelManager cm = new MQChannelManager(conn); MQChannel channel = cm.CreateReceiveChannel(exchangeType, this.Exchange, queue, routeKey); return channel; } /// <summary> /// 启动订阅 /// </summary> public void Start() { if (started) { return; } MQConnection conn = new MQConnection(this.Config, this.vHost); MQChannelManager manager = new MQChannelManager(conn); foreach (var item in this.Queues) { MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey); channel.OnReceivedCallback = item.OnReceived; this.Channels.Add(channel); } started = true; } /// <summary> /// 中止订阅 /// </summary> public void Stop() { foreach (var c in this.Channels) { c.Stop(); } this.Channels.Clear(); started = false; } /// <summary> /// 接收消息 /// </summary> /// <param name="message"></param> public abstract void OnReceived(MessageBody message); public List<MQChannel> Channels { get; set; } = new List<MQChannel>(); /// <summary> /// 消息队列配置 /// </summary> public MQConfig Config { get; set; } /// <summary> /// 消息队列中定义的虚拟机 /// </summary> public abstract string vHost { get; } /// <summary> /// 消息队列中定义的交换机 /// </summary> public abstract string Exchange { get; } /// <summary> /// 定义的队列列表 /// </summary> public List<QueueInfo> Queues { get; } = new List<QueueInfo>(); }
上面的抽象类,原封不动的实现接口契约,代码很是简单,在 Start 方法中,建立通道和启动消息订阅;同时,将通道加入属性 Channels 中,方便后面的自检服务使用;在 Start 方法中ide
/// <summary> /// 启动订阅 /// </summary> public void Start() { if (started) { return; } MQConnection conn = new MQConnection(this.Config, this.vHost); MQChannelManager manager = new MQChannelManager(conn); foreach (var item in this.Queues) { MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey); channel.OnReceivedCallback = item.OnReceived; this.Channels.Add(channel); } started = true; }
使用 MQChannelManager 建立了一个通道,并将通道的回调委托 OnReceivedCallback 设置为 item.OnReceived 方法,该方法将有子类实现;在将当前订阅服务通道建立完成后,标记服务状态 started 为 true,防止重复启动;同时,在该抽象类中,不实现契约的 OnReceived(MessageBody message);强制基础业务服务类去自我实现,由于各类业务的特殊性,这块对消息的处理不能再基础服务中完成函数
接下来要介绍的是服务监控管理类,该类内部定义一个简单的定时器功能,不间断的对 RabbitMQ 的通信进行侦听,一旦发现有断开的链接,就自动建立一个新的通道,并移除旧的通道;同时,提供 Start/Stop 两个方法,以供程序 启动/中止 的时候对测试
public class MQServcieManager { public int Timer_tick { get; set; } = 10 * 1000; private Timer timer = null; public Action<MessageLevel, string, Exception> OnAction = null; public MQServcieManager() { timer = new Timer(OnInterval, "", Timer_tick, Timer_tick); } /// <summary> /// 自检,配合 RabbitMQ 内部自动重连机制 /// </summary> /// <param name="sender"></param> private void OnInterval(object sender) { int error = 0, reconnect = 0; OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 正在执行自检", null); foreach (var item in this.Services) { for (int i = 0; i < item.Channels.Count; i++) { var c = item.Channels[i]; if (c.Connection == null || !c.Connection.IsOpen) { error++; OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 从新建立订阅", null); try { c.Stop(); var channel = item.CreateChannel(c.QueueName, c.RoutekeyName, c.ExchangeTypeName); item.Channels.Remove(c); item.Channels.Add(channel); OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 从新建立完成", null); reconnect++; } catch (Exception ex) { OnAction?.Invoke(MessageLevel.Information, ex.Message, ex); } } } } OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 自检完成,错误数:{error},重连成功数:{reconnect}", null); } public void Start() { foreach (var item in this.Services) { try { item.Start(); } catch (Exception e) { OnAction?.Invoke(MessageLevel.Error, $"启动服务出错 | {e.Message}", e); } } } public void Stop() { try { foreach (var item in this.Services) { item.Stop(); } Services.Clear(); timer.Dispose(); } catch (Exception e) { OnAction?.Invoke(MessageLevel.Error, $"中止服务出错 | {e.Message}", e); } } public void AddService(IService service) { Services.Add(service); } public List<IService> Services { get; set; } = new List<IService>(); }
代码比较简单,就不在一一介绍,为了将异常等内部信息传递到外边,方便使用第三方组件进行日志记录等需求,MQServcieManager 还使用了 MessageLevel 这个定义,方便业务根据不一样的消息级别对消息进行处理this
public enum MessageLevel { Trace = 0, Debug = 1, Information = 2, Warning = 3, Error = 4, Critical = 5, None = 6 }
终于来到了这一步,咱们将要开始使用这个基础服务;首先,建立一个 DemoService 继承自 MQServiceBase ;同时,编码
public class DemoService : MQServiceBase { public Action<MessageLevel, string, Exception> OnAction = null; public DemoService(MQConfig config) : base(config) { base.Queues.Add(new QueueInfo() { ExchangeType = ExchangeType.Direct, Queue = "login-message", RouterKey = "pk", OnReceived = this.OnReceived }); } public override string vHost { get { return "gpush"; } } public override string Exchange { get { return "user"; } } /// <summary> /// 接收消息 /// </summary> /// <param name="message"></param> public override void OnReceived(MessageBody message) { try { Console.WriteLine(message.Content); } catch (Exception ex) { OnAction?.Invoke(MessageLevel.Error, ex.Message, ex); } message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, true); } }
以上的代码很是简单,几乎不须要业务开发者作更多的其它工做,开发者只须要在构造方法内部传入一个 QueueInfo 对象,若是有多个,可一并传入.net
public partial class QueueInfo { /// <summary> /// 队列名称 /// </summary> public string Queue { get; set; } /// <summary> /// 路由名称 /// </summary> public string RouterKey { get; set; } /// <summary> /// 交换机类型 /// </summary> public string ExchangeType { get; set; } /// <summary> /// 接受消息委托 /// </summary> public Action<MessageBody> OnReceived { get; set; } /// <summary> /// 输出信息到客户端 /// </summary> public Action<MQChannel, MessageLevel, string> OnAction { get; set; } }
并设置 vHost 和 Exchange 的值,而后剩下的就是在 OnReceived(MessageBody message) 方法中专心的处理本身的业务了;在这里,咱们仅输出接收到的消息,并设置 ack 为已成功处理。
class Program { static void Main(string[] args) { Test(); } static void Test() { MQConfig config = new MQConfig() { HostName = "127.0.0.1", Password = "123456", Port = 5672, UserName = "dotnet" }; MQServcieManager manager = new MQServcieManager(); manager.AddService(new DemoService(config)); manager.OnAction = OnActionOutput; manager.Start(); Console.WriteLine("服务已启动"); Console.ReadKey(); manager.Stop(); Console.WriteLine("服务已中止,按任意键退出..."); Console.ReadKey(); } static void OnActionOutput(MessageLevel level, string message, Exception ex) { Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine("{0} | {1} | {2}", level, message, ex?.StackTrace); Console.ForegroundColor = ConsoleColor.Gray; } }
消息已经接收并处理,为了查看监控效果,我还手动将网络进行中断,而后监控服务检测到没法链接,尝试重建通道,并将消息输出
在文章中,咱们创建了 RabbitMQ 的通道管理、基础服务管理、契约实现等操做,让业务开发人员经过简单的继承实现去快速的处理业务系统的逻辑,后续若是有增长消费者的状况下,只须要经过 MQServcieManager.AddService 进行简单的调用操做便可,无需对底层技术细节进行过多的改动。
源码下载:
https://github.com/lianggx/EasyAspNetCoreDemo/tree/master/Ron.MQTest