在阅读本文章以前,你能够先阅读:json
首先,事件源与事件处理的映射字典。服务器
private static Dictionary<string, List<object>> eventHandlers = new Dictionary<string, List<object>>();
而后,初始化RabbitMQ,建立到服务器的链接,建立一个通道等this
public RabbitMQEventBus(IConnectionFactory connectionFactory, string exchangeName, string exchangeType = ExchangeType.Fanout, string queueName = null, bool autoAck = false) { this.connectionFactory = connectionFactory; this.connection = this.connectionFactory.CreateConnection(); this.channel = this.connection.CreateModel(); this.exchangeType = exchangeType; this.exchangeName = exchangeName; this.autoAck = autoAck; this.channel.ExchangeDeclare(this.exchangeName, this.exchangeType); this.queueName = this.InitializeEventConsumer(queueName); }
接着,实现订阅,往字典表中添加事件处理实例,并绑定队列spa
public void Subscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : IEvent { var eventTypeName = typeof(TEvent).FullName; if (eventHandlers.ContainsKey(eventTypeName)) { var handlers = eventHandlers[eventTypeName]; handlers.Add(eventHandler); } else { eventHandlers.Add(eventTypeName, new List<object> { eventHandler }); } this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName); }
接着,实现取消订阅,从字典表中删除事件处理实例,并取消绑定队列code
public void Unsubscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : IEvent { var eventType = typeof(TEvent).FullName; if (eventHandlers.ContainsKey(eventType)) { var handlers = eventHandlers[eventType]; if (handlers != null && handlers.Exists(s => s.GetType() == eventHandler.GetType())) { var handlerToRemove = handlers.First(s => s.GetType() == eventHandler.GetType()); handlers.Remove(handlerToRemove); this.channel.QueueUnbind(this.queueName, this.exchangeName, typeof(TEvent).FullName); } } }
接着,实现发布,往队列发布事件队列
public void Publish<TEvent>(TEvent @event) where TEvent : IEvent { var json = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); var eventBody = Encoding.UTF8.GetBytes(json); channel.BasicPublish(this.exchangeName, @event.GetType().FullName, null, eventBody); }
接着,在EventingBasicConsumer.Received事件处理中,经过事件源找到对应的事件处理类,并执行它事件
private string InitializeEventConsumer(string queue) { var localQueueName = queue; if (string.IsNullOrEmpty(localQueueName)) { localQueueName = this.channel.QueueDeclare().QueueName; } else { this.channel.QueueDeclare(localQueueName, true, false, false, null); } var consumer = new EventingBasicConsumer(this.channel); consumer.Received += (model, eventArgument) => { var eventBody = eventArgument.Body.ToArray(); var json = Encoding.UTF8.GetString(eventBody); var @event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); var eventTypeName = eventArgument.RoutingKey; if (eventHandlers.ContainsKey(eventTypeName)) { var handlers = eventHandlers[eventTypeName]; try { foreach (var handler in handlers) { MethodInfo meth = handler.GetType().GetMethod("Handle"); meth.Invoke(handler, new Object[] { @event }); } } catch (Exception ex) { throw ex; } } if (!autoAck) { channel.BasicAck(eventArgument.DeliveryTag, false); } }; this.channel.BasicConsume(localQueueName, autoAck: this.autoAck, consumer: consumer); return localQueueName; }
最后,建立客户端类,具体事件源类,具体事件处理类。get
using Example.EventBus; using RabbitMQ.Client; using System; namespace Eaxmple.EventBus.RabbitMQ.ConsoleApp01 { public class SendedEvent : IEvent { public string Name { get; private set; } public SendedEvent(string name) { Name = name; } } public class CustomerASendedEventHandler : IEventHandler<SendedEvent> { public void Handle(SendedEvent @event) { Console.WriteLine($"顾客A收到{@event.Name}通知!"); } } public class CustomerBSendedEventHandler : IEventHandler<SendedEvent> { public void Handle(SendedEvent @event) { Console.WriteLine($"顾客B收到{@event.Name}通知!"); } } class Program { static void Main(string[] args) { var connectionFactory = new ConnectionFactory { HostName = "localhost" }; var eventBus = new RabbitMQEventBus(connectionFactory, "Eaxmple.EventBus.RabbitMQ.ConsoleApp01.Exchange", queueName: "Eaxmple.EventBus.RabbitMQ.ConsoleApp01.Queue"); var sendedEvent = new SendedEvent("优惠"); var customerASendedEventHandler = new CustomerASendedEventHandler(); eventBus.Subscribe<SendedEvent>(customerASendedEventHandler); var customerBSendedEventHandler = new CustomerBSendedEventHandler(); eventBus.Subscribe<SendedEvent>(customerBSendedEventHandler); Console.WriteLine($"商店发了{sendedEvent.Name}通知!"); eventBus.Publish<SendedEvent>(sendedEvent); Console.ReadKey(); } } }
让咱们来看看输出结果:string
商店发布优惠通知! 顾客A收到优惠通知。 顾客B收到优惠通知。