从前面文章能够看出,消息总线是EDA(事件驱动架构)与微服务架构的核心部件,没有消息总线,就没法很好的实现微服务之间的解耦与通信。一般咱们能够利用现有成熟的消息代理产品或云平台提供的消息服务来构建本身的消息总线;也能够本身彻底写一个消息代理产品,而后基于它构建本身的消息总线。一般咱们不用重复造轮子(除非公司有特殊的要求,好比一些大型互联网公司考虑到自主可控的白盒子),能够利用好比像RabbitMq这样成熟的消息代理产品做为消息总线的底层支持。json
RabbitMq核心组件解释:服务器
Connection:消息的发送方或订阅方经过它链接到RabbitMq服务器。微信
Channel:消息的发送方或订阅方经过Connection链接到RabbitMq服务器后,经过Channel创建会话通道。架构
Exchange:消息的发送方向Exchange发送消息,经过RabbitMq服务器中Exchange与Queue的绑定关系,Exchange会将消息路由到匹配的Queue中。async
Queue:消息的承载者,消息的发送者的消息最终经过Exchange路由到匹配的Queue,消息的接收者从Queue接收消息并进行处理。ide
Exchange模式:在消息发送到Exchange时,须要路由到匹配的Queue中,至于如何路由,则是由Exchange模式决定的。函数
1.Direct模式:特定的路由键(消息类型)转发到该Exchange的指定Queue中。微服务
2.Fanout模式:发送到该Exchange的消息,被同时发送到Exchange下绑定的全部Queue中。性能
3.Topic模式:具备某种特征的消息转发到该Exchange的指定Queue中。this
咱们最多见的使用是Direct模式,若是消息要被多个消费者消费,则可使用Fanout模式。
实现基于RabbitMq的消息总线:
咱们首先须要安装Erlang与RabbitMq到服务器上,而后就能够进行基于RabbitMq的消息总线的开发了,开发的整体思路与步骤以下:
1.首先创建一个项目做为消息总线,而后引入Rabbitmq.Client 这个nuget包,这样就有了RabbitMq开发的支持。
2.前面实现了基本的消息总线,全部基于RabbitMq的消息总线是从它继承下来的,并须要传入特定的参数到消息总线的构造函数中:
public RabbitMqEB(IConnectionFactory connectionFactory,IEventHandlerExecutionContext context, string exchangeName,string exchangeType,string queueName,int publisherorconsumer, bool autoAck = true) : base(context) { this.connectionFactory = connectionFactory; this.connection = this.connectionFactory.CreateConnection(); this.exchangeName = exchangeName; this.exchangeType = exchangeType; this.autoAck = autoAck; this.queueName = queueName; if (publisherorconsumer == 2) { this.channel = CreateComsumerChannel(); } }
connectionFactory:RabbitMq.Client中的类型,用于与RabbitMq服务器创建链接时须要使用的对象。
context:消息与消息处理器之间的关联关系的对象。
exchangeName:生产者或消费者须要链接到的Exchange的名字。
exchangeType:前面所描述的Exchange模式。
queueName:生产者或消费者发送或接收消息时的Queue的名字。
publisherorconsumer:指定链接到消息总线的组件是消息总线的生产者仍是消费者,消费者和生产者会有不一样,消费者(publisherorconsumer==2)会构建一个消费通道,用于从Queue接收消息并调用父类的ieventHandlerExecutionContext的HandleAsync方法来处理消息。
3.创建到RabbitMq的链接:
//判断是否已经创建了链接 public bool IsConnected { get { return this.connection != null && this.connection.IsOpen; } }
public bool TryConnect() { //出现链接异常时的重试策略,一般经过第三方nuget包实现重试功能,这里出现链接异常时,每一个1秒重试一次,共重试5次 var policy = RetryPolicy.Handle<SocketException>().Or<BrokerUnreachableException>() .WaitAndRetry(5, p => TimeSpan.FromSeconds(1),(ex,time)=> { //记录错误日志 }); policy.Execute(() => { //创建RabbitMq Server的链接 this.connection = this.connectionFactory.CreateConnection(); }); if (IsConnected) { return true; } return false; }
4.建立消费者通道:
private IModel CreateComsumerChannel() { if (!IsConnected) { TryConnect(); } var channel = this.connection.CreateModel(); channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); //消费者接收到消息的处理 consumer.Received += async (model, ea) => { var eventbody = ea.Body; var json = Encoding.UTF8.GetString(eventbody); var @event = (IEvent)JsonConvert.DeserializeObject(json); //调用关联对象中消息对应的处理器的处理方法 await this.eventHandlerExecutionContext.HandleAsync(@event); //向会话通道确认此消息已被处理 channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: this.queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { this.channel.Dispose(); this.channel = CreateComsumerChannel(); }; return channel; }
5.对生产者发布消息到交换机队列的支持:
public override void Publish<TEvent>(TEvent @event) { if (!IsConnected) { TryConnect(); } using(var channel = this.connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); //发布到交换机,根据交换机与队列的绑定以及交换机模式,最终发布到指定的队列中 channel.BasicPublish(this.exchangeName, @event.GetType().FullName,null, body); } }
6.对订阅者从交换机队列中订阅消息的支持:
public override void Subscribe<TEvent, TEventHandler>() { //注册接收到的消息类型到订阅方的处理器之间的关系 if (!this.eventHandlerExecutionContext.IsRegisterEventHandler < TEvent,TEventHandler>()){ this.eventHandlerExecutionContext.RegisterEventHandler<TEvent, TEventHandler>(); //消费者进行队列绑定 this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName); } }
从上面的6个步骤,咱们基本上就完成了基于RabbitMq消息总线的基本功能,这里须要说明的是,上述代码只是演示,在实际生产环境中,不能直接使用以上代码,还须要当心的重构此代码以保证可靠性与性能。
QQ讨论群:309287205
微服务实战视频请关注微信公众号: