RabbitMQ的事件总线
在上文中,咱们讨论了事件处理器中对象生命周期的问题,在进入新的讨论以前,首先让咱们总结一下,咱们已经实现了哪些内容。下面的类图描述了咱们已经实现的组件及其之间的关系,貌似系统已经变得愈来愈复杂了。html
其中绿色的部分就是上文中新实现的部分,包括一个简单的Event Store,一个事件处理器执行上下文的接口,以及一个基于ASP.NET Core依赖注入框架的执行上下文的实现。接下来,咱们打算淘汰PassThroughEventBus,而后基于RabbitMQ实现一套新的事件总线。git
事件总线的重构
根据前面的结论,事件总线的执行须要依赖于事件处理器执行上下文,也就是上面类图中PassThroughEventBus对于IEventHandlerExecutionContext的引用。更具体些,是在事件总线订阅某种类型的事件时,须要将事件处理器注册到IEventHandlerExecutionContext中。那么在实现RabbitMQ时,也会有着相似的设计需求,即RabbitMQEventBus也须要依赖IEventHandlerExecutionContext接口,以保证事件处理器生命周期的合理性。github
为此,咱们新建一个基类:BaseEventBus,并将这部分公共的代码提取出来,须要注意如下几点:sql
- 经过BaseEventBus的构造函数传入IEventHandlerExecutionContext实例,也就限定了全部子类的实现中,必须在构造函数中传入IEventHandlerExecutionContext实例,这对于框架的设计很是有利:在实现新的事件总线时,框架的使用者无需查看API文档,便可知道事件总线与IEventHandlerExecutionContext之间的关系,这符合SOLID原则中的Open/Closed Principle
- BaseEventBus的实现应该放在EdaSample.Common程序集中,更确切地说,它应该放在EdaSample.Common.Events命名空间下,由于它是属于框架级别的组件,而且不会依赖任何基础结构层的组件
BaseEventBus的代码以下:shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public
abstract
class
BaseEventBus : IEventBus
{
protected
readonly
IEventHandlerExecutionContext eventHandlerExecutionContext;
protected
BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
{
this
.eventHandlerExecutionContext = eventHandlerExecutionContext;
}
public
abstract
Task PublishAsync<TEvent>(TEvent @
event
, CancellationToken cancellationToken =
default
)
where
TEvent : IEvent;
public
abstract
void
Subscribe<TEvent, TEventHandler>()
where
TEvent : IEvent
where
TEventHandler : IEventHandler<TEvent>;
// Disposable接口实现代码省略
}
|
在上面的代码中,PublishAsync和Subscribe方法是抽象方法,以便子类根据不一样的须要来实现。数据库
接下来就是调整PassThroughEventBus,使其继承于BaseEventBus:json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public
sealed
class
PassThroughEventBus : BaseEventBus
{
private
readonly
EventQueue eventQueue =
new
EventQueue();
private
readonly
ILogger logger;
public
PassThroughEventBus(IEventHandlerExecutionContext context,
ILogger<PassThroughEventBus> logger)
:
base
(context)
{
this
.logger = logger;
logger.LogInformation($
"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}."
);
eventQueue.EventPushed += EventQueue_EventPushed;
}
private
async
void
EventQueue_EventPushed(
object
sender, EventProcessedEventArgs e)
=> await
this
.eventHandlerExecutionContext.HandleEventAsync(e.Event);
public
override
Task PublishAsync<TEvent>(TEvent @
event
, CancellationToken cancellationToken =
default
)
{
return
Task.Factory.StartNew(() => eventQueue.Push(@
event
));
}
public
override
void
Subscribe<TEvent, TEventHandler>()
{
if
(!
this
.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
{
this
.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
}
}
// Disposable接口实现代码省略
}
|
代码都很简单,也就很少作说明了,接下来,咱们开始实现RabbitMQEventBus。架构
RabbitMQEventBus的实现
首先须要新建一个.NET Standard 2.0的项目,使用.NET Standard 2.0的项目模板所建立的项目,能够同时被.NET Framework 4.6.1或者.NET Core 2.0的应用程序所引用。建立新的类库项目的目的,是由于RabbitMQEventBus的实现须要依赖RabbitMQ C#开发库这个外部引用。所以,为了保证框架核心的纯净和稳定,须要在新的类库项目中实现RabbitMQEventBus。app
Note:对于RabbitMQ及其C#库的介绍,本文就再也不涉及了,网上有不少资料和文档,博客园有不少朋友在这方面都有使用经验分享,RabbitMQ官方文档也写得很是详细,固然是英文版的,若是英语比较好的话,建议参考官方文档。框架
如下就是在EdaSample案例中,RabbitMQEventBus的实现,咱们先读一读代码,再对这部分代码作些分析。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
public
class
RabbitMQEventBus : BaseEventBus
{
private
readonly
IConnectionFactory connectionFactory;
private
readonly
IConnection connection;
private
readonly
IModel channel;
private
readonly
string
exchangeName;
private
readonly
string
exchangeType;
private
readonly
string
queueName;
private
readonly
bool
autoAck;
private
readonly
ILogger logger;
private
bool
disposed;
public
RabbitMQEventBus(IConnectionFactory connectionFactory,
ILogger<RabbitMQEventBus> logger,
IEventHandlerExecutionContext context,
string
exchangeName,
string
exchangeType = ExchangeType.Fanout,
string
queueName =
null
,
bool
autoAck =
false
)
:
base
(context)
{
this
.connectionFactory = connectionFactory;
this
.logger = logger;
this
.connection =
this
.connectionFactory.CreateConnection();
this
.channel =
this
.connection.CreateModel();
this
.exchangeType = exchangeType;
this
.exchangeName = exchangeName;
this
.autoAck = autoAck;
this
.channel.ExchangeDeclare(
this
.exchangeName,
this
.exchangeType);
this
.queueName =
this
.InitializeEventConsumer(queueName);
logger.LogInformation($
"RabbitMQEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}."
);
}
public
override
Task PublishAsync<TEvent>(TEvent @
event
, CancellationToken cancellationToken =
default
(CancellationToken))
{
var
json = JsonConvert.SerializeObject(@
event
,
new
JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
var
eventBody = Encoding.UTF8.GetBytes(json);
channel.BasicPublish(
this
.exchangeName,
@
event
.GetType().FullName,
null
,
eventBody);
return
Task.CompletedTask;
}
public
override
void
Subscribe<TEvent, TEventHandler>()
{
if
(!
this
.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
{
this
.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
this
.channel.QueueBind(
this
.queueName,
this
.exchangeName,
typeof
(TEvent).FullName);
}
}
protected
override
void
Dispose(
bool
disposing)
{
if
(!disposed)
{
if
(disposing)
{
this
.channel.Dispose();
this
.connection.Dispose();
logger.LogInformation($
"RabbitMQEventBus已经被Dispose。Hash Code:{this.GetHashCode()}."
);
}
disposed =
true
;
base
.Dispose(disposing);
}
}
private
string
InitializeEventConsumer(
string
queue)
{
var
localQueueName = queue;
if
(
string
.IsNullOrEmpty(localQueueName))
{
localQueueName =
this
.channel.QueueDeclare().QueueName;
}
else
{
this
.channel.QueueDeclare(localQueueName,
true
,
false
,
false
,
null
);
}
var
consumer =
new
EventingBasicConsumer(
this
.channel);
consumer.Received += async (model, eventArgument) =>
{
var
eventBody = eventArgument.Body;
var
json = Encoding.UTF8.GetString(eventBody);
var
@
event
= (IEvent)JsonConvert.DeserializeObject(json,
new
JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
await
this
.eventHandlerExecutionContext.HandleEventAsync(@
event
);
if
(!autoAck)
{
channel.BasicAck(eventArgument.DeliveryTag,
false
);
}
};
this
.channel.BasicConsume(localQueueName, autoAck:
this
.autoAck, consumer: consumer);
return
localQueueName;
}
}
|
阅读上面的代码,须要注意如下几点:
- 正如上面所述,构造函数须要接受IEventHandlerExecutionContext对象,并经过构造函数的base调用,将该对象传递给基类
- 构造函数中,queueName参数是可选参数,也就是说:
- 若是经过RabbitMQEventBus发送事件消息,则无需指定queueName参数,仅需指定exchangeName便可,由于在RabbitMQ中,消息的发布方无需知道消息是发送到哪一个队列中
- 若是经过RabbitMQEventBus接收事件消息,那么也分两种状况:
- 若是两个进程在使用RabbitMQEventBus时,同时指定了queueName参数,而且queueName的值相同,那么这两个进程将会轮流处理路由至queueName队列的消息
- 若是两个进程在使用RabbitMQEventBus时,同时指定了queueName参数,但queueName的值不相同,或者都没有指定queueName参数,那么这两个进程将会同时处理路由至queueName队列的消息
- 有关Exchange和Queue的概念,请参考RabbitMQ的官方文档
- 在Subscribe方法中,除了将事件处理器注册到事件处理器执行上下文以外,还经过QueueBind方法,将指定的队列绑定到Exchange上
- 事件数据都经过Newtonsoft.Json进行序列化和反序列化,使用TypeNameHandling.All这一设定,使得序列化的JSON字符串中带有类型名称信息。在此处这样作既是合理的,又是必须的,由于若是没有带上类型名称的信息,JsonConvert.DeserializeObject反序列化时,将没法断定获得的对象是否能够转换为IEvent对象,这样就会出现异常。但若是是实现一个更为通用的消息系统,应用程序派发出去的事件消息可能还会被由Python或者Java所实现的应用程序所使用,那么对于这些应用,它们并不知道Newtonsoft.Json是什么,也没法经过Newtonsoft.Json加入的类型名称来获知事件消息的初衷(Intent),Newtonsoft.Json所带的类型信息又会显得冗余。所以,简单地使用Newtonsoft.Json做为事件消息的序列化、反序列化工具,实际上是欠妥的。更好的作法是,实现自定义的消息序列化、反序列化器,在进行序列化的时候,将.NET相关的诸如类型信息等,做为Metadata(元数据)附着在序列化的内容上。理论上说,在序列化的数据中加上一些元数据信息是合理的,只不过咱们对这些元数据作一些标注,代表它是由.NET框架产生的,第三方系统若是不关心这些信息,能够对元数据不作任何处理
- 在Dispose方法中,注意将RabbitMQ所使用的资源dispose掉
使用RabbitMQEventBus
在Customer服务中,使用RabbitMQEventBus就很是简单了,只须要引用RabbitMQEventBus的程序集,而后在Startup.cs文件的ConfigureServices方法中,替换PassThroughEventBus的使用便可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public
void
ConfigureServices(IServiceCollection services)
{
this
.logger.LogInformation(
"正在对服务进行配置..."
);
services.AddMvc();
services.AddTransient<IEventStore>(serviceProvider =>
new
DapperEventStore(Configuration[
"mssql:connectionString"
],
serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));
var
eventHandlerExecutionContext =
new
EventHandlerExecutionContext(services,
sc => sc.BuildServiceProvider());
services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);
// services.AddSingleton<IEventBus, PassThroughEventBus>();
var
connectionFactory =
new
ConnectionFactory { HostName =
"localhost"
};
services.AddSingleton<IEventBus>(sp =>
new
RabbitMQEventBus(connectionFactory,
sp.GetRequiredService<ILogger<RabbitMQEventBus>>(),
sp.GetRequiredService<IEventHandlerExecutionContext>(),
RMQ_EXCHANGE,
queueName: RMQ_QUEUE));
this
.logger.LogInformation(
"服务配置完成,已注册到IoC容器!"
);
}
|
Note:一种更好的作法是经过配置文件来配置IoC容器,在曾经的Microsoft Patterns and Practices Enterprise Library Unity Container中,使用配置文件是很方便的。这样只须要Customer服务可以经过配置文件来配置IoC容器,同时只须要让Customer服务依赖(注意,不是程序集引用)于不一样的事件总线的实现便可,无需对Customer服务从新编译。
下面来验证一下效果。首先确保RabbitMQ已经配置并启动稳当,我是安装在本地机器上,使用默认安装。首先启动ASP.NET Core Web API,而后经过Powershell发起两次建立Customer的请求:
查看一下数据库是否更新正常:
并检查一下日志信息:
RabbitMQ中Exchange的信息:
总结
本文提供了一种RabbitMQEventBus的实现,目前来讲是够用的,并且这种实现是可使用在实际项目当中的。在实际使用中,或许也会碰到一些与RabbitMQ自己有关的问题,这就须要具体问题具体分析了。此外,本文没有涉及事件消息丢失、重发而后保证最终一致性的问题,这些内容会在后面讨论。从下文开始,咱们着手逐步实现CQRS架构的领域事件和事件存储部分。
源代码的使用
本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,经过不一样的release tag来区分针对不一样章节的源代码。本文的源代码请参考chapter_3这个tag,以下:
欢迎访问个人博客新站:http://sunnycoding.net。