微服务的出现和原生的云架构触发了DDD,CQRS和Event Sourcing的复苏。全部这些方式实现的核心是Domain Event(领域事件), 它是分布式系统中实现一整套DDD架构及实现最终一致性的主要机制。java
Domain Events的通信改变了domain领域对象的状态,好比Customer Created、Account Credited等。它们老是以过去时来表达,一个domain event呈现了一个特定的domain已经发生的行为。git
DDD推荐把一个总体系统拆分红多个微服务。如何拆分咱们改日再谈,可是假设一个遵循DDD来设计的金融系统来定义Customer和Account做为一个Aggregates聚合。宏观来看一个Aggregate聚合就是领域对象,由实体类和是对象组成,它定义了一个事务边界。一个聚合封装了业务逻辑来验证业务规则和保持聚合内的一致性。聚合间经过发布领域事件来触发状态的改变。CustomerCreated一个Customer聚合发布的领域事件,AccountCreated是一个Account发布的领域事件。Customer保存了它拥有的account,Account也许保存了归属的customers的引用。不管如何,每一个聚合均可能须要处理其余聚合发布的事件。在CQRS中,command service以聚合为单位来更新状态和发布domain events,这些domain events被query service接收和处理来保持物化视图的最终一致性。github
domain event是一个表达发生了什么并携带了须要修改的数据、时间戳、聚合ID,及其余附加的元数据。在一个分布式系统中,domain events以发布订阅的形式被发布到一个MQ系统中。这容许任意多个远程进程(微服务)来异步地订阅接收和处理这些领域事件。一个CQRS+ES系统中监听domain events的微服务可能须要订阅并处理多种类型的domain event 事件,因此须要一些机制来分发每一类事件到指定的事件处理方法中。spring
在java的世界中,咱们老是能够在监听消息中间件的事件时用switch case分别处理每一类事件。这只是开个玩笑,咱们固然能够作的更好。事实上,现有的开源CQRS+ES框架好比Axon提供了一个@EventHandler注解来启用事件总线进行基于入参类型的事件分发。使用Axon,CustomerCreatedEvent和AccountCreatedEvent是明确的java类型(Class),因此你能够这么写:express
public class MyEventHandler { @EventHandler public void handle(CustomerCreatedEvent event) { ... } @EventHandler public void handle(AccountCreatedEvent event) { ... } }
Eventuate是另外一个CQRS+ES框架提供了类似的实现机制,示例。编程
以上的方式听起来都遵循了面向对象编程。然而,这却使分布式系统的Java类互相依赖。最终domain events变成了分布式系统中的共享类库。在众多微服务中依赖共享类库产生了高耦合的缺点。分布式系统中的这种类型的耦合必须被消除,这种高耦合是微服务架构中的一个反模式。举个例子,若是你把domain events打包到一个公共的jar中被各个微服务共享依赖,当一个领域事件被新增或修改后,每个有依赖的微服务都必须从新部署,不论这个微服务依不依赖它订阅的domain event类型。固然,每一个微服务能够解码每一个消息的有效负载到不一样的本地定义的事件类型,但这须要大量重复性的工做。json
最近发布的Spring Cloud Stream Chelsea release 介绍了一种原生的事件分发特性,支持事件驱动架构同时避免依赖共享的domain类型。Spring Cloud Stream提供了一个@StreamListener注解用来控制 序列化的载荷 做为 方法的入参 并 执行方法,例如:架构
@StreamListener(Sink.INPUT) public void handle(Foo foo){ ... }
将会自动的转换一个经过kafka或Rabbit MQ(或其余任何支持的消息中间件)传输的序列化的json载荷到一个‘Foo’对象中而后执行‘handle’方法。一般Spring Cloud Stream应用会为每个channel声明一个stream监听器,监听的channel绑定到其余应用发布数据的topic上。框架
新的事件分发特性在@StreamListener上增长了condition属性来使路由消息到多个监听器成为可能,condition的值是用SPEL表达式运算出来的一个boolean值。condition应用到传入的消息上,可以计算任何消息载荷或特定的消息头、或其组合。这提供了一种极其灵活的路由机制并不须要不通的事件类型定义类。例如,咱们定义一个带String eventType属性的Event类型,Spring Cloud Stream将提供开盒即用的功能:dom
@EnableBinding class MyEventHandler{ @StreamListener(target=Sink.INPUT, condition="payload.eventType=='CustomerCreatedEvent'") public void handleCustomerEvent(@Payload Event event) { // handle the message</span> } @StreamListener(target=Sink.INPUT, condition="payload.eventType=='AccountCreatedEvent'") public void handleAccountEvent(@Payload Event event) { // handle the message</span> } }
这是一个提高,可是仍没有到达咱们想要的可用程序。预想中的完美的效果,咱们想看到跟其余的CQRS+ES框架类似效果的实现,咱们的目标效果若是:
@EnableEventHandling class MyEventHandler{ @EventHandler(eventType = "CustomerCreatedEvent") public void handleCustomerEvent(@Payload Event event) { // handle the message } @EventHandler(eventType = "CustomerCreatedEvent") public void handleAccountEvent(@Payload Event event) { // handle the message } }
幸运的是,Sring容许咱们来小小的自定义它。Core Spring Framework已经对自定义注解有优秀的支持,因此你能够很容易的自定义一个@EventHandler注解来替代@StreamListener。咱们能够定义默认的目标channel到 ' Sink.INPUT ':
@StreamListener @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface EventHandler { @AliasFor(annotation=StreamListener.class, attribute="target") String value() default ""; @AliasFor(annotation=StreamListener.class, attribute="target") String target() default Sink.INPUT; @AliasFor(annotation=StreamListener.class, attribute="condition") String condition() default ""; }
如今咱们已经接近咱们要的目标效果了,可是仍然没有彻底到达那个效果:
@EnableBinding class MyEventHandler{ @EventHandler(condition="payload.eventType=='CustomerCreatedEvent'") public void handleCustomerEvent(@Payload Event event) { // handle the message } @EventHandler(condition="payload.eventType=='AccountCreatedEvent'") public void handleAccountEvent(@Payload Event event) { // handle the message } }
最后一步若是须要采用eventType的值来识别事件类型,须要一点附加的魔法。咱们规定,在每一个消息中提供一个eventType头。一旦咱们采用了这个规约,condition表达式能够转化成模板来实现,只须要重载Spring Cloud Stream处理StreamListener注解的Bean Post Processor。重载的函数以下:
import static org.springframework.cloud.stream.config.BindingServiceConfiguration.STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME; @Configuration public class EventHandlerConfiguration { /* * The SpEL expression used to allow the Spring Cloud Stream Binder to dispatch to methods * Annotated with @EventHandler */ private static String eventHandlerSpelPattern = "payload.eventType=='%s'"; /** * Override the default {@link StreamListenerAnnotationBeanPostProcessor} to inject value of * 'eventType' attribute into 'condition' expression. * * @return */ @Bean(name = STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME) public static BeanPostProcessor streamListenerAnnotationBeanPostProcessor() { return new StreamListenerAnnotationBeanPostProcessor() { @Override protected StreamListener postProcessAnnotation(StreamListener originalAnnotation, Method annotatedMethod) { Map<String, Object> attributes = new HashMap<>( AnnotationUtils.getAnnotationAttributes(originalAnnotation)); if (StringUtils.hasText(originalAnnotation.condition())) { String spelExpression = String.format(eventHandlerSpelPattern, originalAnnotation.condition()); attributes.put("condition", spelExpression); } return AnnotationUtils.synthesizeAnnotation(attributes, StreamListener.class, annotatedMethod); } }; } }
接下来,咱们能够用自定义的EnableEventHandling注解来引入这个configuration:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import({EventHandlerConfig.class}) public @interface EnableEventHandling { }
最终,咱们修改EventHandler注解,定义一个eventType属性来作condition的别名:
@StreamListener @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface EventHandler { /** * The name of the binding target (e.g. channel) that the method subscribes to. * @return the name of the binding target. */ @AliasFor(annotation=StreamListener.class, attribute="condition") String value() default ""; /** * The name of the binding target (e.g. channel) that the method subscribes to. * @return the name of the binding target. */ @AliasFor(annotation=StreamListener.class, attribute="target") String target() default Sink.INPUT; /** * A condition that must be met by all items that are dispatched to this method. * @return a SpEL expression that must evaluate to a {@code boolean} value. */ @AliasFor(annotation=StreamListener.class, attribute="condition") String eventType() default ""; }
用Spring Cloud Stream和一些小小的Spring魔法,咱们已经实现了在相似CQRS+ES的EDA架构中处理domain events的注解驱动框架,咱们实现了面向事件的注解,对比已你看到过的CQRS+ES框架的:
@EnableEventHandling class MyEventHandler{ @EventHandler(eventType = "CustomerCreatedEvent") public void handleCustomerEvent(@Payload Event event) { // handle the message } @EventHandler(eventType = "CustomerCreatedEvent") public void handleAccountEvent(@Payload Event event) { // handle the message } }
不像已有的CQRS+ES框架的是,咱们不依赖载荷的类型来路由事件。这意味着咱们避免了微服务架构中须要共享common数据类型的缺点。固然,若是你真正想要根据载荷类型来路由消息,咱们也能够很容易的修改来实现。
原文连接:https://dturanski.wordpress.com/2017/03/26/spring-cloud-stream-for-event-driven-architectures/
水平有限,翻译的可能不是很好,将就的看吧,但愿有用。
项目源码:https://github.com/dturanski/event-handler-annotation-demo