【bird-java】分布式服务间的事件总线EventBus

什么是EventBus
EventBus是对发布-订阅模式的一种实现。其以一种很是优雅的方式实现了组件间的解耦与通讯,在Android开发、DDD等领域都有很是普遍的应用。
java

事件流大体以下:git

  • Producer向EventBus发送事件。
  • EventBus向全部监听了该事件的Consumer推送事件。
  • 监听了该事件的Consumer消费事件。


注:一个组件便可以是Producer,也能够是Consumer。github

分布式服务间的EventBus
在分布式系统中,事件在服务之间的传递要比单机EventBus复杂不少。有没有一种适用于分布式服务之间的,而且事件传递就像单机同样简单的EventBus呢?在GitHub上搜索了JAVA实现的EventBus,排名前十的几乎都是用于Android或JAVA的单机事件总线。良久以后...仍是本身动手吧。集群环境下的EventBus比单机版须要多考虑一些问题,好比:spring

  1.  服务集群部署的状况下,如何保证每一个集群都可订阅该事件,且每一个集群只能消费一次该事件。
  2. 如何实现一个服务内部多个`xxxService`订阅同一事件。

解决方案:缓存

  1. 使用`kafka`实现集群间的发布订阅(基于`topic`),同一集群处于同一个kafka的consumer-group来保证每一个集群只会消费一次该事件。
  2. 服务在启动时可反射得到全部实现了`IEventHandler<TEventArg>`的类并缓存,服务消费消息时获取全部注册了该消息的handler并调用其`HandleEvent`方法。

部分关键源码分布式

一、事件消息的定义ide

public abstract class EventArg implements IEventArg{

    private Date eventTime;

    public EventArg(){
        eventTime = new Date();
    }

    public Date getEventTime() {
        return eventTime;
    }

    public void setEventTime(Date eventTime) {
        this.eventTime = eventTime;
    }
}

事件消息默认记录建立时间,可扩展其余字段,好比发送时间、标识等。ui

二、使用spring-kafka发送消息this

/**
 * kafka事件注册器,向kafka队列中push消息
 */
@Component
public class KafkaRegister implements IEventRegister {

    @Autowired(required = false)
    private KafkaTemplate<String,IEventArg> kafkaTemplate;

    /**
     * 事件注册
     *
     * @param eventArg 事件参数
     */
    @Override
    public void regist(IEventArg eventArg) {
        kafkaTemplate.send(getTopic(eventArg),eventArg);
    }

    /**
     * 获取kafka的topic
     *
     *
     * @param eventArg
     * @return topic
     */
    private String getTopic(IEventArg eventArg){
        return eventArg.getClass().getName();
    }
}

三、消费kafka消息并执行当前服务中全部订阅了该消息的事件spa

/**
 * kafka事件监听器
 */
public class KafkaEventArgListener implements MessageListener<String,EventArg> {

    @Autowired
    private IEventHandlerFactory eventHandlerFactory;

    @Override
    public void onMessage(ConsumerRecord<String, EventArg> consumerRecord) {
        if (consumerRecord == null) return;
        EventArg value = consumerRecord.value();

        Set<IEventHandler> handlers = eventHandlerFactory.getHandlers(value);
        if (handlers == null) return;
        for (IEventHandler handler : handlers) {
            handler.HandleEvent(value);
        }
    }
}

 

EventBus的使用

一、事件的定义。全部事件均继承于上文EventArg抽象类,示例以下:

public class TestEventArg extends EventArg{
    private String value;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

 

二、事件发布。示例代码:

eventBus.push(new TestEventArg());

 

三、事件订阅。一个服务发布事件以后,任何服务中的任何`xxxServiceImpl`类都可订阅该事件,实现`IEventHandler<TEventArg>`接口便可完成事件的订阅,示例以下:

public class FormServiceImpl extends AbstractServiceImpl<Form> implements FormService,IEventHandler<TestEventArg> {

    @Override
    public void HandleEvent(TestEventArg eventArg) {
        System.out.println("notify zero======");
    }
}

 总体来讲,使用仍是很简单的,EventBus实现与使用示例收录于bird-java项目中,项目地址:https://github.com/liuxx001/bird-java

相关文章
相关标签/搜索