软件事件即系统中其它组件感兴趣的事物。PHP 程序员工做中广泛不使用事件,由于这不是该语言的特性。不过,如今更常见的是,新的框架和库采用它们来提供一种新的解耦,重用和加速代码的途径。php
领域事件是领域发生改变时相关的事件。领域事件即发生在领域内的事件,是领域专家所关心的。程序员
在领域驱动设计中,领域事件是基础构建块,它们能够:web
领域事件的本质是异步通讯。有关此主题的详细信息,咱们推荐 Gregor Hohpe 和 Bobby Woolf 《企业集成模式:设计,构建及部署消息传递解决方案》一书。算法
假设有一个 Javascript 2D 平台游戏,在屏幕上同时有大量不一样组件交互。其中一个组件表示剩余生命值,另外一个显示全部得分,还有一个显示完成当前等级还剩余的时间。每次你的角色跳到敌人身上,分数就会增长。当你的得分高于一个分数值时,就会获取额外一条命。当你的角色捡起一把钥匙,一扇门一般就会打开。可是全部这些组件是如何相互交互的呢?此场景最佳构架又是什么?数据库
这里有两个可选的方案:第一种是每一个组件与它所链接的组件结合起来。不过,在上面的例子中,意味着有大量组件耦合在一块儿,每一个额外的功能增长都须要开发者修改代码。但你还记得开闭原则(OCP)吗?增长一个新的组件不该该使它必须更新第一个组件,这会有太多要维护的工做。json
第二种,更好的方法,就是将全部组件链接到一个单独的对象上,该对象处理游戏中全部重要的事件。它接收来自每一个组件的事件并转发给特定的组件。例如,得分组件可能对一个 EnemyKilled
事件感兴趣,而 LifeCaptered
事件对玩家实体和剩余生命数组件至关有用。一般这种方式,全部组件与一个管理全部通知的单独组件耦合。使用这种方法,增长或者移除组件都不会影响现有的组件。segmentfault
当开发一个单体应用时,事件对于解耦组件很是有用。当用分布式方式来开发整个领域时,事件对于领域中发挥做用的每一个服务或者应用之间的解耦至关有用。关键点是同样的,但规模却不一样。设计模式
领域事件是一种特殊类型的事件,用来通知本地或者远程领域限界上下文的变化。
Vaughn Vernon 定义领域事件为:数组
领域中发生的事情。
Eric Evans 定义领域事件为:服务器
领域事件即领域模型中一个完整的部分,是领域中发生的事件的表现形式。突然不相碰的领域活动,同时明确领域专家但愿追踪的,或者被通知的事件,或与其它领域对象状态改变有关的事件。
Martin Flower 定义领域事件为:
一类捕获影响领域的感兴趣的记录。
在 web 应用里的领域事件例子有用户注册
,定货
,转移用户
以及添加产品
等。
在一家售票代理机构中,运营经理决定提升 U2 秀节目的价格。她进入后台,编辑该节目。一个ShowPriceChanged
领域事件被发布,而且在同一事务里将新的节目价格持久到数据库中。
一个批处理进程获取该领域事件并它投递到 RabbitMQ
队列中。领域事件被成成两个队列:一是同一个本地限界上下文,另外一个远程事件用于商务智能目的。
在第一个队列中,一个工做进程经过事件里的 ID 检索相应的节目,并将其推送到 Elasticsearch 服务器,从而使得用户在搜索时能够看到最新价格。
在第二个队列中,另外一个进程将信息插入到一个日志服务器或者数据池,在这能够运行报表或者数据挖掘进程。
一个不能使用领域事件集成到系统的外部应用能够经过本地限界上下文提供的 REST API,访问全部的 ShowPriceChanged 事件。
如你所见,领域事件在处理最终一致性和整合不一样限界上下文时很是有用。聚合建立并发布事件。订阅者能够存储事件以及以后转发它们给其它远程订阅者。
星期二咱们去巴布饭店吃饭,用信用卡支付。这可能被建模为一个事件,事件类型为 "下单",主题是 "个人信用卡", 发生时间为 "星期二"。若是巴布饭使用旧的手动系统,直到周五才传输交易,那么交易将在周五生效。
事情就这样发生了。并非全部事情都有意义,一些值得记录但并不会发生反应。然而,通常是最感兴趣的事情才发生反应。许多须要对感兴趣的事件作出反应。多数状况下你须要知道为何一个系统会作出这样的反应。
经过将系统的输入传输到领域事件流中,你能够记录全部的系统输入。这有助于你组织你的处理逻辑,还容许你保留系统输入的审核日志。
练习
尝试在你当前的领域中定位潜在的领域事件
在进入了解领域事件的细节以前,让咱们来看一个真实的领域事件实例,以及它们是怎样对咱们的应用和整个领域起到帮助的。
让咱们考虑一个简单的 Application Service,新用户注册,例如一个电子商务上下文。Application Service 会在其它章节阐述,因此没必要在表面上操心太多。相反的,仅须要关注执行方法:
class SignUpUserService implements ApplicationService { private $userRepository; private $userFactory; private $userTransformer; public function __construct( UserRepository $userRepository, UserFactory $userFactory, UserTransformer $userTransformer ) { $this->userRepository = $userRepository; $this->userFactory = $userFactory; $this->userTransformer = $userTransformer; } /** * @param SignUpUserRequest $request * @return User * @throws UserAlreadyExistsException */ public function execute(SignUpUserRequest $request) { $email = $request->email(); $password = $request->password(); $user = $this->userRepository->userOfEmail($email); if ($user) { throw new UserAlreadyExistsException(); } $user = $this->userFactory->build( $this->userRepository->nextIdentity(), $email, $password ); $this->userRepository->add($user); $this->userTransformer->write($user); } }
如上所示,Application Service 部分会检查用户是否存在。若是不存在,则会建立一个新用户并添加到 UserRepository
中。
如今考虑一个附加需求:一个新用户在注册时须要用邮件提醒。不须要想太多,首先咱们想到的方法就是更新 Application Service,加入一段能够完成这项工做的代码,多是 EmailSender
这种在添加方法以后运行的代码。不过,如今让咱们考虑另外一种方法。
触发一个 UserRegistered
事件,另外一个组件监听到后发送邮件怎么样?这种新方法有一些很是酷的好处。首先,在新用户注册时,咱们不须要每次再去更新 Application Service 的代码。其次,它更易于测试。Application Service 也变得更简单。每次有新的动做开发时,咱们仅须要为此动做写测试用例。
后来在同一个电子商务项目中,咱们被告知集成一个非 PHP 编写的开源游戏化平台。每次用户在咱们的电子商务上下文下单或者浏览产品时,他们能够在他们的电子商务用户主页上看到所获取的徽章或者被邮件通知到。咱们该如何为此问题建模呢?
按照第一种方法,咱们将用以前确认电子邮件的方法来更新应用服务,来整合到新的平台中。使用领域事件的方法,咱们能够为 UserRegistered
建立另外一个 listener
事件,该事件能够用 REST 或者 SOA 的方式链接到游戏平台。更妙的是,它能够将事件放到 RabbitMQ 这样的消息队列,以便游戏限界上下文能够订阅并自动收到通知。咱们电子商务限界上下文根本不须要了解游戏上下文。
领域事件一般是不可变的,由于他们是过去某些内容的记录。除了事件的描述外,一个领域事件一般包含一个事件发生时刻的时间戳以及事件中涉及的实体标识。此外,一个领域事件一般具备单独的时间戳,来指示事件什么时候进入系统,以及输入事件的人员身份。领域事件自己的标识能够基于这些属性集。例如,若是同一个事件的两个实例到达一个节点,它们能够被视为相同。
领域事件的本质就是,你可使用它来捕获应用中那么能够触发改变的事物,或者领域中其它应用中你感兴趣的改变。这些随后被处理的事件对象会致使系统的改变,并被存储在审记系统中。
全部事件都必须用过去时动词表示,由于它们都在过去发生的。例如,CustomerRelocated
, CargoShipped
,或者 InventoryLossageRecorded
。在英语中有一些有趣的例子,人们可能会倾向于使用名词,而不是过去时动词。例如一个对天然灾害感兴趣的国会议来讲,"地震"或者"倾塌"就是相关事件。咱们建议尽可能避免在领域事件中使用相似名词的诱惑,而是坚持用动词的过去时态。
当咱们讨论"重定位用户"的反作用时,请思考通用语言的不一样。这个事件使概念变得明确,而之前,聚合或者多个聚合之间发生的改变会留下隐式的概念,这些都须要探索和定义。例如,在大多数系统中,当Hibernate
或者实体框架这样的库上发生反作用时,它不会影响到领域。从客户端的角度来看,这些事件是隐式和透明的。事件的引入使概念变得明确,并使之成为通用语言的一部分。"重定位用户"不只仅是改变某些内容,还会在语言中显式的产生CustomerRelocatedEvent
事件。
正如咱们说起过的,领域事件关注的是领域内过去发生的改变。根据定义,你不可能改变过去,除非你是Marty McFly
而且有一个DeLorean
(译者注:这里是《回到将来》电影里的角色)。所以,请记住领域事件是不可变的。
Symfony
事件分派器一些 PHP 框架支持事件。不过,不要混淆这些事件与领域事件。它们在特征和目的上是不一样的。例如,
Symfony
有Event Dispatcher
组件,若是你须要为一个状态机实现一个事件系统,则能够依赖它。在Symfony
中,在请求与响应的转换过程也是由事件处理。可是,Symfony Events
是可变的,而且每一个listeners
侦听器都可以修改,添加或者更新事件中的信息。
为了准确地描述你的领域业务,你须要与领域专家紧密合做,来定义通用语言。这须要使用领域事件,实体,值对象等等来完成领域概念。在对事件建模时,依据通用语言,在它们的限界上下文内去命名事件及它们的属性。若是一个事件是一个聚合上的命令执行操做的结果,则名称一般派生自执行的命令。事件名称必须反映事件过去的性质,这一点很是和重要。
让咱们考虑用户注册功能。领域事件须要表示它。下面的代码显示了基本领域事件的最小接口:
interface DomainEvent { /** * @return DateTimeImmutable */ public function occurredOn(); }
正如你所见,最小的必要信息就是DateTimeImmutable
,这是为了知道事件是什么时候发生的。
如今让咱们用下面的代码来建模用户注册事件。正如咱们在上面提到的,事件名称必须是动词过去式,那么UserRegistered
是个不错的选择:
class UserRegistered implements DomainEvent { private $userId; public function __construct(UserId $userId) { $this->userId = $userId; $this->occurredOn = new \DateTimeImmutable(); } public function userId() { return $this->userId; } public function occurredOn() { return $this->occurredOn; } }
通知订阅者新用户建立所必需的最少许信息就是 UserId
。有了这个信息,任何过程,命令,或者应用服务 - 无论是是否来自同一限界上下文 - 均可能都此事件作出反应。
通常来讲
getters
访问器来访问它们的属性可是,若是相同或者不一样的限界上下文须要更多信息的话会发生什么?下面让咱们看看用更多信息来建模领域事件 - 例如,邮箱地址:
class UserRegistered implements DomainEvent { private $userId; private $userEmail; public function __construct(UserId $userId, $userEmail) { $this->userId = $userId; $this->userEmail = $userEmail; $this->occurredOn = new DateTimeImmutable(); } public function userId() { return $this->userId; } public function userEmail() { return $this->userEmail; } public function occurredOn() { return $this->occurredOn; } }
上面,咱们添加了邮箱地址,添加更多信息到一个领域事件能够帮助提升性能或者使不一样限界上下文整合理简单化。从另外一个限界上下文的视角来考虑,也有助于建模事件。当在上游的限界上下文建立一个新用户时,下游的上下文则建立它本身的用户。添加用户邮箱能够保存一个同步请求到上游上下文,以防万一下游的上下文须要它。
你是否还记得游戏化机制的例子?为了建立游平台用户,也就是所说的玩家,那么一个来自电子商务限界上下文的 UserId
可能就够了。但若是游戏平台要用邮件通知用户中奖消息怎么样?在这种状况下,邮箱地址则是必要的。因此,若是邮箱地址包含在源领域事件中,咱们就能够作到。若是不在,游戏限界上下文就须要用 REST 或者 SOA 从电子商务上下文中获取这些信息。
为什么不用整个用户实体想知道你是否应该在限界上下文的领域事件中包含整个用户实体?咱们的建议是不须要。领域事件通常用于内部的给定上下文或者外部其它上下文的消息通讯。换句话说,在 C2C 电子商务产品目录限界上下文中的卖方是谁,产品反馈中的产品评论做者是谁。二者能够共享相同的ID或者电子邮件,可是卖方和做者是不一样的概念,表明来自不一样的限界上下文。所以,来自一个限界上下文的实体在另外一个上下文没有任何意义或彻底不一样。
领域事件不只仅是作批量做业,例如发送邮件或者与其它上下文通讯。它们也对性能和可扩展提高感兴趣。让咱们看一个例子。
考虑如下场景:你有一个电子商务应用,你的主要持久化机制工具是 MySQL,可是对于浏览或者过滤你的产品目录,你用了一个更好的方法,例如 Elasticsearch 或者 Solr。在 Elasticsearch 里,你最获取到存储在完整数据库中的一部分信息。如何保持数据同步?内容团队经过后台工具更新目录时会发生什么?
有人不时重建整个目录的索引。这很是昂贵且缓慢。一种更明智的方法是更新与已更新的产品的一个或一些文档。咱们该怎么作呢?答案是使用领域事件。
不过,假如你已经在用 Doctrine
了,这些对你来讲就不怎么新鲜了。根据 Doctrine 2 ORM 2 Documentation:
Doctrine 2 具备轻量级事件系统,该系统是 Common 包的一部分。Doctrine 使用它来高度系统事件,主要是生命周期事件。你也能够将其用于你的自定义事件。
此外,它声明了:
生命周期回调定义在一个实体类上。它们使你能够在该实体的实例遇到相关生命周期事件时触发回调。每一个生命周期事件能够定义多个回调。生命周期回调最好用于特定实体类生命周期的简单操做上。
让咱们看一个来自 Doctrine Events Documentation 中的例子:
/** @Entity @HasLifecycleCallbacks */ class User { // ... /** * @Column(type="string", length=255) */ public $value; /** @Column(name="created_at", type="string", length=255) */ private $createdAt; /** @PrePersist */ public function doStuffOnPrePersist() { $this->createdAt = date('Y-m-d H:i:s'); } /** @PrePersist */ public function doOtherStuffOnPrePersist() { $this->value = 'changed from prePersist callback!'; } /** @PostPersist */ public function doStuffOnPostPersist() { $this->value = 'changed from postPersist callback!'; } /** @PostLoad */ public function doStuffOnPostLoad() { $this->value = 'changed from postLoad callback!'; } /** @PreUpdate */ public function doStuffOnPreUpdate() { $this->value = 'changed from preUpdate callback!'; } }
你能够将特定任务挂载到 Doctrine 实体生命周期的每一个不一样的重要时刻。例如,在 PostPersist
上,你能够生成实体的 JSON 文档并将其放到 Elasticsearch 中。这样,就很容易使不一样持久化机制间的数据保持一致。
Doctrine 事件是一个很好的例子来讲明用事件围绕你的实体的好处。可是你能够想知道使用它们的问题是什么。这是由于它们耦合到框架,它们是同步的,而且它们在你的应用程序级别上起做用,却不是出于通讯的目的。因此这就是为何尽管难以实施和处理,领域事件仍然很是有趣的缘由。
持久化事件老是一个好的想法。大家中的一些人能够想知道为何不能直接发布领域事件到一个消息或者日志系统。这是由于持久化它们有一些有趣的好处:
咱们在哪持久化领域事件?在一个事件存储器(Event Store)。事件存储器是一个领域事件仓储,它做为一个抽象(接口或抽象类)存在于咱们的领域空间中。它的职责是附带领域事件并对进行查询。一种可能的基本接口以下:
interface EventStore { public function append(DomainEvent $aDomainEvent); public function allStoredEventsSince($anEventId); }
然而,根据你领域事件的用途,上一个接口能够有更多的方法来查询事件。
在实现方面,你能够决定使用 Doctrine Respository, DBAL,或者普通的 PDO。由于领域事件是不可变的,因此使用 Doctrine Repository 会加大没必要要的性能损失,尽管对于中小型程序而言,Doctrine 可能还够用。让咱们看下 Doctrine 的可能实现:
class DoctrineEventStore extends EntityRepository implements EventStore { private $serializer; public function append(DomainEvent $aDomainEvent) { $storedEvent = new StoredEvent( get_class($aDomainEvent), $aDomainEvent->occurredOn(), $this->serializer()->serialize($aDomainEvent, 'json') ); $this->getEntityManager()->persist($storedEvent); } public function allStoredEventsSince($anEventId) { $query = $this->createQueryBuilder('e'); if ($anEventId) { $query->where('e.eventId > :eventId'); $query->setParameters(['eventId' => $anEventId]); } $query->orderBy('e.eventId'); return $query->getQuery()->getResult(); } private function serializer() { if (null === $this->serializer) { /** \JMS\Serializer\Serializer\SerializerBuilder */ $this->serializer = SerializerBuilder::create()->build(); } return $this->serializer; } }
StoreEvent
须要 Doctrine 实体映射到数据库。正如你所见,在附带和持久化 Store
以后,是没有 flush
方法调用的,若是这个操做在 Doctrine 事务内,那么是没必要要的。所以,咱们暂时搁置这里,咱们会在应用服务一章中再深刻探讨。
如今咱们来看 StoreEvent
的实现:
class StoredEvent implements DomainEvent { private $eventId; private $eventBody; private $occurredOn; private $typeName; /** * @param string $aTypeName * @param \DateTimeImmutable $anOccurredOn * @param string $anEventBody */ public function __construct( $aTypeName, \DateTimeImmutable $anOccurredOn, $anEventBody ) { $this->eventBody = $anEventBody; $this->typeName = $aTypeName; $this->occurredOn = $anOccurredOn; } public function eventBody() { return $this->eventBody; } public function eventId() { return $this->eventId; } public function typeName() { return $this->typeName; } public function occurredOn() { return $this->occurredOn; } }
下面是它的映射:
Ddd\Domain\Event\StoredEvent: type: entity table: event repositoryClass: Ddd\Infrastructure\Application\Notification\DoctrineEventStore id: eventId: type: integer column: event_id generator: strategy: AUTO fields: eventBody: column: event_body type: text typeName: column: type_name type: string length: 255 occurredOn: column: occurred_on type: datetime
为了用不一样字段来持久化领域事件,咱们将不得不将这些字段链接成一个序列化的字符串。 typeName
字段说明领域事件的领域广度。一个实体或者值对象在限界上下文里才有意义,但领域事件在限界上下文间定义了通信协议。
在分布式系统中,会发生垃圾。你将不得不处理未发布,在事务链中某个地方丢失或已屡次发布的领域事务。这就是为何必须使用 ID 持久化领域事件很重要,这能够轻松跟踪哪一个领域事件已经发布,哪一个已经丢失。
领域事件应该在它们表明的事实发生时发布。例如,当一个新用户已经注册时,一个新的 UserRegistered
事件应该被发布。
参考下面的报纸比喻:
发布领域事件推荐的方法就是使用一个简单的监听者 - 观察者模式来实现 DomainEventPublisher
。
继续用咱们应用中新用户注册的例子,咱们看看相应的领域事件是怎样发布的:
class User { protected $userId; protected $email; protected $password; public function __construct(UserId $userId, $email, $password) { $this->setUserId($userId); $this->setEmail($email); $this->setPassword($password); DomainEventPublisher::instance()->publish( new UserRegistered($this->userId) ); } // ... }
如示例所示,用户建立时,一个新的 UserRegistered
事件将发布。这在实体的构造函数内完成,而不是外面。由于用这个方法,能够轻松保持咱们领域的一致性;任何建立新用户的客户端都会发布相应的事件。另外一方面,这使得须要建立用户实体而不使用其构造函数的基础结构变得更加复杂。例如,Doctrine 使用序列化和反序列化技术来从新建立对象而不调用构造函数。然而,若是你必须建立本身的应用程序,这将不会像 Doctrine 那样容易。
通常来说,从简单数据(例如数组)构造对象称为水合(水化反应)。让咱们看看一种简单的方法来构建从数据库中获取的新用户。首先,让咱们经过应用工厂方法(Factory Method)模式将领域事件的发布提取为本身的方法。
根据 Wikipedia模板方法模式是一种行为设计模式,它在一个操做里定义了一个算法的程序骨架,而将实现延迟到子步骤中
class User { protected $userId; protected $email; protected $password; public function __construct(UserId $userId, $email, $password) { $this->setUserId($userId); $this->setEmail($email); $this->setPassword($password); $this->publishEvent(); } protected function publishEvent() { DomainEventPublisher::instance()->publish( new UserRegistered($this->userId) ); } // ... }
如今,让咱们用一个新的基础架构实体来扩展当前的 User
类,该实体将为咱们完成这项工做。这里的小技巧是使 publishEvent
方法不执行任何操做,以便领域事件不会被发布:
class CustomOrmUser extends User { protected function publishEvent() { } public static function fromRawData($data) { return new self( new UserId($data['user_id']), $data['email'], $data['password'] ); } }
记住要谨使用此方法。你可能会从持久化机制中得到无效的对象。由于领域规则老是在变化。另外一种不使用父构造函数的方法可能以下:
class CustomOrmUser extends User { public function __construct() { } public static function fromRawData($data) { $user = new self(); $user->userId = new UserId($data['user_id']); $user->email = $data['email']; $user->password = $data['password']; return $user; } }
用这种方法,父构造函数不能被调用而且 User
的属性必须被保护。其它的方法还有反射,在本色构造函数里传标识,使用诸如 Proxy-Manager
的代理库,或者使用像 Doctrine 这样的 ORM。
其它发布领域事件的方法正如你在以前的例子中所见,咱们使用了静态类来发布领域事件。做为替代方案,其余人,尤为是在使用事件源时,会建议在实体内用一个字段保存全部触发的事件。为了访问全部事件,在聚合里使用
getter
方法器。这也是一种有效的方法。可是,有时很难跟踪哪些实体已触发事件。在非实体的地方触发事件也可能很困难,例如:领域服务。从好的方面来讲,测试一个实体是否触发了事件将容易得多。
你应该努力从更深层的事务链发布领域事件。实体或值对象的内部越近越好。正如咱们在上一节中看到的,有时候这并不容易,但最终对于客户端来讲却更简单。咱们看到开发者从应用服务或者领域服务中发布领域事件。这看起来更容易实现,但最终将致使贫血领域模型。这与在领域服务中推送业务逻辑而不是放到你的实体中没有什么不一样。
领域发布发布者(Domain Event Publisher)是一个单例类,来自于咱们须要发布领域事件的限界上下文。它同时支付附加监听器,Domain Event Subscriber 会监听他们感兴趣的任何领域事件。这与使用 on
方法的 jQuery 订阅事件没有太大差异:
class DomainEventPublisher { private $subscribers; private static $instance = null; public static function instance() { if (null === static::$instance) { static::$instance = new static(); } return static::$instance; } private function __construct() { $this->subscribers = []; } public function __clone() { throw new BadMethodCallException('Clone is not supported'); } public function subscribe( DomainEventSubscriber $aDomainEventSubscriber ) { $this->subscribers[] = $aDomainEventSubscriber; } public function publish(DomainEvent $anEvent) { foreach ($this->subscribers as $aSubscriber) { if ($aSubscriber->isSubscribedTo($anEvent)) { $aSubscriber->handle($anEvent); } } } }
publish
方法经过全部可能的订阅者,来检查它们是否对发布的领域事件感兴趣。若是是,订阅者的 handle
方法将被调用。
subscribe
方法添加一个新的 DomainEventSubscriber
,它将监听指定的领域事件类型:
interface DomainEventSubscriber { /** * @param DomainEvent $aDomainEvent */ public function handle($aDomainEvent); /** * * @param DomainEvent $aDomainEvent * @return bool */ public function isSubscribedTo($aDomainEvent); }
正如咱们已经讨论过的,持久化全部领域事件是个好主意。咱们能够在咱们的应用程序中经过使用指定的订阅者来轻松地持久化全部已发布的领域事件。咱们如今建立一个 DomainEventSubscriber
,它会监听全部领域事件,不管什么类型,都会持久化到咱们的事件存储器 (EventStore) 中。
class PersistDomainEventSubscriber implements DomainEventSubscriber { private $eventStore; public function __construct(EventStore $anEventStore) { $this->eventStore = $anEventStore; } public function handle($aDomainEvent) { $this->eventStore->append($aDomainEvent); } public function isSubscribedTo($aDomainEvent) { return true; } }
$eventStore
能够是自定义的 Doctrine Repository, 或者正如所看到的其它有能力持久化DomainEvents
到数据库的对象。
设置 DomainEventPublisher
订阅者最好的地方是哪里?这看须要。对于可能影响整个请求周期的全局订阅者,最好的位置多是 DomainEventPublisher
自身初始化的地方。对于受特殊应用服务影响的订阅者,服务实例化的地方多是个更好的选择。让咱们来看一个使用 Silex
的例子。
在 Silex
里,注册 DomainEventPublisher
最好的方法就是经过使用一个应用中间件持久化全部领域事件。根据 Silex 2.0 Documentation:
一个before
应用中间件容许你在controller
执行前调整请求。
这是订阅负责将这些事件持久化到数据库的监听器的正确位置,这些事件将在之后发送到 RabbitMQ:
// ... $app['em'] = $app->share(function () { return (new EntityManagerFactory())->build(); }); $app['event_repository'] = $app->share(function ($app) { return $app['em']->getRepository( 'Ddd\Domain\Model\Event\StoredEvent' ); }); $app['event_publisher'] = $app->share(function ($app) { return DomainEventPublisher::instance(); }); $app->before( function (Symfony\Component\HttpFoundation\Request $request) use ($app) { $app['event_publisher']->subscribe( new PersistDomainEventSubscriber( $app['event_repository'] ) ); } );
使用此设置,每次聚合发布领域事件时,它将被持久化到数据库中。任务完成。
练习若是你使用 Symfony, Laravel, 或者其它 PHP 框架,找到一种方法,来订阅全局指定订阅者,围绕你的领域事件执行任务。
若是你要在请求即将完成时对全部领域事件执行任何操做,则能够建立一个监听器,该监听器将全部已发布的的领域事件存储在内存中。若是你添加一个 getter
访问器到这个监听器,来返回全部领域事件,则能够决定要作什么。如前文所建议,若是你不想或没法持久化事件到同一事务,这将很是有用。
你已经知道了如何发布领域事件,但你怎样对此作单元测试并确保 UserRegistered
真的被触发?最简单的方法就是,咱们建议用一个指定的 EventListener
,它被看成一个 Spy
来记录领域事件是否发布。让咱们看看 User
实体的单元测试例子:
use Ddd\Domain\DomainEventPublisher; use Ddd\Domain\DomainEventSubscriber; class UserTest extends \PHPUnit_Framework_TestCase { // ... /** * @test */ public function itShouldPublishUserRegisteredEvent() { $subscriber = new SpySubscriber(); $id = DomainEventPublisher::instance()->subscribe($subscriber); $userId = new UserId(); new User($userId, 'valid@email.com', 'password'); DomainEventPublisher::instance()->unsubscribe($id); $this->assertUserRegisteredEventPublished($subscriber, $userId); } private function assertUserRegisteredEventPublished( $subscriber, $userId ) { $this->assertInstanceOf( 'UserRegistered', $subscriber->domainEvent ); $this->assertTrue( $subscriber->domainEvent->serId()->equals($userId) ); } } class SpySubscriber implements DomainEventSubscriber { public $domainEvent; public function handle($aDomainEvent) { $this->domainEvent = $aDomainEvent; } public function isSubscribedTo($aDomainEvent) { return true; } }
对于上面有一些替代方案。你能够为 DomainEventPublisher
或者某些反射框架使用静态 setter
来检测调用。不过,咱们认为咱们分享的方法更为天然。最后但并不是最不重要的一点就是,请记住清理 Spy
订阅。以避免影响其余单元测试的执行。
为了将一组领域事件传达给本地或者远程限界上下文,主要有两种策略:消息或者 REST API。第一个方法是使用诸如 RabbitMQ 之类的消息系统来传输领域事件。第二个应时建立一个 REST API,来访问特定上下文的领域事件。
随着全部领域事件持久化到数据库中,惟一剩下的事情就是将它们推送到咱们最喜欢的消息系统中。咱们更喜欢 RabbitMQ,不过其余任何系统(例如 ActiveMQ 或者 ZeroMQ)都能任务。要使用 PHP 整合 RabbitMQ,没有不少选择,但 php-amqplib
能够完成这项工做。
首先, 咱们须要一种可以将持久化的领域事件发送 RabbitMQ 的服务。你能够想要为全部事件而查询 EventStore,并发送每一个事件,这不是坏事。然而,咱们能够屡次推送同一领域事件,一般来讲,咱们须要将从新发布的领域事件减小到最少。若是重发的领域事件为0,那就更好了。为了避免重发领域事件,咱们须要某种组件来跟踪哪些领域事件已经被推送,哪些仍然残余。最后但并不是最不重要的一点就是,一旦咱们知道必须推送哪些领域事件,就将它们发送,并追踪发布到消息系统中的最后一个事件。让咱们看一下该服务的可能实现:
class NotificationService { private $serializer; private $eventStore; private $publishedMessageTracker; private $messageProducer; public function __construct( EventStore $anEventStore, PublishedMessageTracker $aPublishedMessageTracker, MessageProducer $aMessageProducer, Serializer $aSerializer ) { $this->eventStore = $anEventStore; $this->publishedMessageTracker = $aPublishedMessageTracker; $this->messageProducer = $aMessageProducer; $this->serializer = $aSerializer; } /** * @return int */ public function publishNotifications($exchangeName) { $publishedMessageTracker = $this->publishedMessageTracker(); $notifications = $this->listUnpublishedNotifications( $publishedMessageTracker ->mostRecentPublishedMessageId($exchangeName) ); if (!$notifications) { return 0; } $messageProducer = $this->messageProducer(); $messageProducer->open($exchangeName); try { $publishedMessages = 0; $lastPublishedNotification = null; foreach ($notifications as $notification) { $lastPublishedNotification = $this->publish( $exchangeName, $notification, $messageProducer ); $publishedMessages++; } } catch (\Exception $e) { // Log your error (trigger_error, Monolog, etc.) } $this->trackMostRecentPublishedMessage( $publishedMessageTracker, $exchangeName, $lastPublishedNotification ); $messageProducer->close($exchangeName); return $publishedMessages; } protected function publishedMessageTracker() { return $this->publishedMessageTracker; } /** * @return StoredEvent[] */ private function listUnpublishedNotifications( $mostRecentPublishedMessageId ) { return $this ->eventStore() ->allStoredEventsSince($mostRecentPublishedMessageId); } protected function eventStore() { return $this->eventStore; } private function messageProducer() { return $this->messageProducer; } private function publish( $exchangeName, StoredEvent $notification, MessageProducer $messageProducer ) { $messageProducer->send( $exchangeName, $this->serializer()->serialize($notification, 'json'), $notification->typeName(), $notification->eventId(), $notification->occurredOn() ); return $notification; } private function serializer() { return $this->serializer; } private function trackMostRecentPublishedMessage( PublishedMessageTracker $publishedMessageTracker, $exchangeName, $notification ) { $publishedMessageTracker->trackMostRecentPublishedMessage( $exchangeName, $notification ); } }
NotificationService
依赖三个接口。咱们已经看到 EventStore
,它主要负责增长和查询领域事件。第二个是 PublishedMessageTracker
,主要用来追踪已推送的消息。第三个就是 MessageProducer
,一个表示咱们消息系统的接口:
interface PublishedMessageTracker { /** * @param string $exchangeName * @return int */ public function mostRecentPublishedMessageId($exchangeName); /** * @param string $exchangeName * @param StoredEvent $notification */ public function trackMostRecentPublishedMessage( $exchangeName, $notification ); }
mostRecentPublishedMessageId
方法返回 最后发布消息的 ID,所以这个过程能够从下一次开始。trackMostRecentPublishedMessage
负责追踪哪一个消息是最后发送的,目的是在你可能须要时重发消息。exchangeName
表明咱们将要把领域事件发往的通讯频道。让咱们看看一个 Doctrine 实现的 PublishedMessageTracker
:
class DoctrinePublishedMessageTracker extends EntityRepository\ implements PublishedMessageTracker { /** * @param $exchangeName * @return int */ public function mostRecentPublishedMessageId($exchangeName) { $messageTracked = $this->findOneByExchangeName($exchangeName); if (!$messageTracked) { return null; } return $messageTracked->mostRecentPublishedMessageId(); } /** * @param $exchangeName * @param StoredEvent $notification */ public function trackMostRecentPublishedMessage( $exchangeName, $notification ) { if (!$notification) { return; } $maxId = $notification->eventId(); $publishedMessage = $this->findOneByExchangeName($exchangeName); if (null === $publishedMessage) { $publishedMessage = new PublishedMessage( $exchangeName, $maxId ); } $publishedMessage->updateMostRecentPublishedMessageId($maxId); $this->getEntityManager()->persist($publishedMessage); $this->getEntityManager()->flush($publishedMessage); } }
这里的代码很是简单明了。咱们惟一须要的极端状况就是,系统还没有发布任何领域事件。
为何是交换机名称?咱们将在第 12 章集成限界上下文一章中更为详细地介绍这一点。可是,当系统正在运行而且新的限界上下文开始起做用时,你可能会对全部领域事件重发到新的限界上下文感兴趣。所以,跟踪上一次发布的领域事件及其改善的频道可能会在之后派上用场。
为了跟踪已发布的领域事件,咱们须要一个交换机名称一个通知 ID。下面是一种可能的实现:
class PublishedMessage { private $mostRecentPublishedMessageId; private $trackerId; private $exchangeName; /** * @param string $exchangeName * @param int $aMostRecentPublishedMessageId */ public function __construct( $exchangeName, $aMostRecentPublishedMessageId ) { $this->mostRecentPublishedMessageId = $aMostRecentPublishedMessageId; $this->exchangeName = $exchangeName; } public function mostRecentPublishedMessageId() { return $this->mostRecentPublishedMessageId; } public function updateMostRecentPublishedMessageId($maxId) { $this->mostRecentPublishedMessageId = $maxId; } public function trackerId() { return $this->trackerId; } }
这是其对应的映射关系:
Ddd\Domain\Event\PublishedMessage: type: entity table: event_published_message_tracker repositoryClass: Ddd\Infrastructure\Application\Notification\ DoctrinePublished\MessageTracker id: trackerId: column: tracker_id type: integer generator: strategy: AUTO fields: mostRecentPublishedMessageId: column: most_recent_published_message_id type: bigint exchangeName: type: string column: exchange_name
如今,让咱们看看 MessageProducer
接口用来作什么的,以及它的实现细节:
interface MessageProducer { public function open($exchangeName); /** * @param $exchangeName * @param string $notificationMessage * @param string $notificationType * * @param int $notificationId * @param \DateTimeImmutable $notificationOccurredOn * @return */ public function send( $exchangeName, $notificationMessage, $notificationType, $notificationId, \DateTimeImmutable $notificationOccurredOn ); public function close($exchangeName); }
简单!open
和 close
方法打开和关闭一个消息系统链接。send
方法携带一个消息体(消息名称及消息 ID),并发送到咱们的消息引擎,而不用关心它是什么。由于咱们选择的是 RabbitMQ,咱们须要实现链接及发送过程:
abstract class RabbitMqMessaging { protected $connection; protected $channel; public function __construct(AMQPConnection $aConnection) { $this->connection = $aConnection; $this->channel = null; } private function connect($exchangeName) { if (null !== $this->channel) { return; } $channel = $this->connection->channel(); $channel->exchange_declare( $exchangeName, 'fanout', false, true, false ); $channel->queue_declare( $exchangeName, false, true, false, false ); $channel->queue_bind($exchangeName, $exchangeName); $this->channel = $channel; } public function open($exchangeName) { } protected function channel($exchangeName) { $this->connect($exchangeName); return $this->channel; } public function close($exchangeName) { $this->channel->close(); $this->connection->close(); } } class RabbitMqMessageProducer extends RabbitMqMessaging implements MessageProducer { /** * @param $exchangeName * @param string $notificationMessage * @param string $notificationType * @param int $notificationId * @param \DateTimeImmutable $notificationOccurredOn */ public function send( $exchangeName, $notificationMessage, $notificationType, $notificationId, \DateTimeImmutable $notificationOccurredOn ) { $this->channel($exchangeName)->basic_publish( new AMQPMessage( $notificationMessage, [ 'type' => $notificationType, 'timestamp' => $notificationOccurredOn->getTimestamp(), 'message_id' => $notificationId ] ), $exchangeName ); } }
如今咱们有了一个 DomainService
,能够将领域事件推送到 RabbitMQ 这样的消息系统中,是时候执行它们了。咱们须要选择一种交付机制来运行服务。咱们我的建议是建立一个 Symfony Console
命令:
class PushNotificationsCommand extends Command { protected function configure() { $this ->setName('domain:events:spread') ->setDescription('Notify all domain events via messaging') ->addArgument( 'exchange-name', InputArgument::OPTIONAL, 'Exchange name to publish events to', 'my-bc-app' ); } protected function execute( InputInterface $input, OutputInterface $output ) { $app = $this->getApplication()->getContainer(); $numberOfNotifications = $app['notification_service'] ->publishNotifications( $input->getArgument('exchange-name') ); $output->writeln( sprintf( '<comment>%d</comment>' . '<info>notification(s) sent!</info>', $numberOfNotifications ) ); } }
按照这个 Silex
例子,让咱们看看定义在 Silex Pimple Service Container
中的 $app['notification_service']
的定义:
// ... $app['event_store'] = $app->share(function ($app) { return $app['em']->getRepository('Ddd\Domain\Event\StoredEvent'); }); $app['message_tracker'] = $app->share(function ($app) { return $app['em'] ->getRepository('Ddd\Domain\Event\Published\Message'); }); $app['message_producer'] = $app->share(function () { return new RabbitMqMessageProducer( new AMQPStreamConnection('localhost', 5672, 'guest', 'guest') ); }); $app['message_serializer'] = $app->share(function () { return SerializerBuilder::create()->build(); }); $app['notification_service'] = $app->share(function ($app) { return new NotificationService( $app['event_store'], $app['message_tracker'], $app['message_producer'], $app['message_serializer'] ); }); //...
有了消息传统中已经实现的 EventStore
,应该很容易添加一些分布功能,领域查询事件以及渲染 JSON 或者 XML 表述,以发布 REST API。为何这么有趣?嗯,分布式系统使用消息中间件必须面对许多不一样的问题,例如消息未到达,消息重复到达,或消息到达失序。这就是为何须要一个 API 来发布你的领域事件,以便其它限界上下文能够要求一些缺失信息的缘由。仅做为示例,考虑一个 /event
端点的 HTTP 请求。一个可能的实现以下:
[ { "id": 1, "version": 1, "typeName": "Lw\\Domain\\Model\\User\\UserRegistered", "eventBody": { "user_id": { "id": "459a4ffc-cd57-4cf0-b3a2-0f2ccbc48234" } }, "occurredOn": { "date": "2016-05-26 06:06:07.000000", "timezone_type": 3, "timezone": "UTC" } }, { "id": 2, "version": 2, "typeName": "Lw\\Domain\\Model\\Wish\\WishWasMade", "eventBody": { "wish_id": { "id": "9e90435a-395c-46b0-b4c4-d4b769cbf201" }, "user_id": { "id": "459a4ffc-cd57-4cf0-b3a2-0f2ccbc48234" }, "address": "john@example.com", "content": "This is my new wish!" }, "occurredOn": { "date": "2016-05-26 06:06:27.000000", "timezone_type": 3, "timezone": "UTC" }, "timeTaken": "650" } //... ]
如你在前面的示例中所见,咱们在一个 JSON REST API 中暴露一组领域事件。在输出示例中,你能够看到一个关于每一个领域事件的 JSON 表述。这有一些有趣的要点。首先是 version
字段。有时你的领域事件会发展:它们会包含更多字段,它们会改变某些现有字段的行为,或者会删除某些现有字段。这就是在领域事件中添加 version
字段很重要的缘由。若是其余限界上下文正在监听此类事件,则它们可使用 version
字段以不一样方式解析领域事件。在对 REST API 进行版本控制时,你也可能会遇到相同的问题。
另一个就是名称。若是你想使用领域事件的类名,那么大多数状况下均可以。问题是当团队因为重构而决定更改类名时,在这种状况下,全部监听该名称的限界上下文都将中止工做。仅当你在同一队列中发布不一样领域事件时,才会出现此问题。若是你将每一个领域事件类型发布到不一样的队列中,则不是真正的问题,但若是你选择这种方法,那么将面临一系列不一样的问题,例如接收无序事件。像许多其余状况下同样,这须要权衡。咱们强烈建议你阅读 **《企业集成模式:设计,构建和部署消息系统解决方案》。在这本书里,你将学习使用异步方法集成多个应用程序的不一样模式。因为领域事件在集成频道发送消息,所以全部消息模式都适用于它们。
练习考虑为领域事件使用 REST API 的利弊。考虑限界上下文耦合。你也能够为你当前的应用实现 REST API。
咱们看到了使用基本接口建模一个合适的领域事件的技巧,也了解到在何处发布领域事件(越接近实体越好),而且了解到将这些领域事件传播到本地和远程限界上下文的策略。如今,剩下的惟一事情就是在消息系统中监听通知,读取通知,并执行相应的应用服务或命令。咱们将在第 12 章,集成有限上下文 和第 5 章,服务 中看到如何执行此操做。