什么是EventBus
EventBus是对发布-订阅模式的一种实现。其以一种很是优雅的方式实现了组件间的解耦与通讯,在Android开发、DDD等领域都有很是普遍的应用。java
事件流大体以下:git
注:一个组件便可以是Producer,也能够是Consumer。github
分布式服务间的EventBus
在分布式系统中,事件在服务之间的传递要比单机EventBus复杂不少。有没有一种适用于分布式服务之间的,而且事件传递就像单机同样简单的EventBus呢?在GitHub上搜索了JAVA实现的EventBus,排名前十的几乎都是用于Android或JAVA的单机事件总线。良久以后...仍是本身动手吧。集群环境下的EventBus比单机版须要多考虑一些问题,好比:spring
解决方案:缓存
部分关键源码分布式
一、事件消息的定义ide
public abstract class EventArg implements IEventArg{ private Date eventTime; public EventArg(){ eventTime = new Date(); } public Date getEventTime() { return eventTime; } public void setEventTime(Date eventTime) { this.eventTime = eventTime; } }
事件消息默认记录建立时间,可扩展其余字段,好比发送时间、标识等。ui
二、使用spring-kafka发送消息this
/** * kafka事件注册器,向kafka队列中push消息 */ @Component public class KafkaRegister implements IEventRegister { @Autowired(required = false) private KafkaTemplate<String,IEventArg> kafkaTemplate; /** * 事件注册 * * @param eventArg 事件参数 */ @Override public void regist(IEventArg eventArg) { kafkaTemplate.send(getTopic(eventArg),eventArg); } /** * 获取kafka的topic * * * @param eventArg * @return topic */ private String getTopic(IEventArg eventArg){ return eventArg.getClass().getName(); } }
三、消费kafka消息并执行当前服务中全部订阅了该消息的事件spa
/** * kafka事件监听器 */ public class KafkaEventArgListener implements MessageListener<String,EventArg> { @Autowired private IEventHandlerFactory eventHandlerFactory; @Override public void onMessage(ConsumerRecord<String, EventArg> consumerRecord) { if (consumerRecord == null) return; EventArg value = consumerRecord.value(); Set<IEventHandler> handlers = eventHandlerFactory.getHandlers(value); if (handlers == null) return; for (IEventHandler handler : handlers) { handler.HandleEvent(value); } } }
EventBus的使用
一、事件的定义。全部事件均继承于上文EventArg抽象类,示例以下:
public class TestEventArg extends EventArg{ private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
二、事件发布。示例代码:
eventBus.push(new TestEventArg());
三、事件订阅。一个服务发布事件以后,任何服务中的任何`xxxServiceImpl`类都可订阅该事件,实现`IEventHandler<TEventArg>`接口便可完成事件的订阅,示例以下:
public class FormServiceImpl extends AbstractServiceImpl<Form> implements FormService,IEventHandler<TestEventArg> { @Override public void HandleEvent(TestEventArg eventArg) { System.out.println("notify zero======"); } }
总体来讲,使用仍是很简单的,EventBus实现与使用示例收录于bird-java项目中,项目地址:https://github.com/liuxx001/bird-java。