Vert.x系列(二)--EventBusImpl源码分析

前言:Vert.x 实现了2种完成不一样的eventBus:node

EventBusImpl(A local event bus implementation)和 它的子类 ClusteredEventBus(An event bus implementation that clusters with other Vert.x nodes)。这里介绍下EventBusImpl设计模式

 

EventBusImpl 原理:调用consumer方法时,以address-handler做为k-v存在一个map的容器中。接着调用send方法时,把message,DeploymentOptions等内容封装成对象(MessageIml,命令模式),从以address为k从map里取出handler.把MessageIml做为参数传递给handler运行。安全

 

一.初始化: 数据结构

初始化过程就是new  EventBusImpl,并修改状态变量started。app

首先,在VertxImpl的构造方法框架

VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler)

中进行初始化。以  options.isClustered()为判断条件,调用createAndStartEventBus(options, resultHandler);oop

其次createAndStartEventBus中作了2件事测试

1.以options.isClustered()判断条件,new出了ClusteredEventBus/ EventBusImpl. new时并无业务逻辑。(额外提一句eventBus = new EventBusImpl(this);使eventBus和VertImpl相互拥有对方的引用,是很常见的写法。)ui

2.调用EventBusImpl的初始化方法start(),并返回结果给最外层resultHandler的。start()更没作什么事,只是EventBusImpl里面有个状态变量started。把它置为true.this

 

二. consumer订阅

EventBusImpl维护了

protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>()

成员变量。

Handlers 是一个handler的List的封装类,上面能够理解为 ConcurrentMap<String, List<Handler>>这种数据结构。consumer方法以address为k,以handler做v的list的一员,存放在handlerMap中。

因此重点关注对handlerMap的操做。

 

调用vertx.eventBus().consumer("Address1", ar -> {});发生了什么?

查看代码发现,先new HandlerRegistration这里也有相互引用。再调用HandlerRegistration .handler,那里面又会调用eventBusImpl.addRegistration()。在HandlerRegistration这个类兜了一圈,又回到eventBusImpl里。

(相关代码截断以下:  EventBusImpl.consumer(address);--> new HandlerRegistration --> consumer.handler-->eventBus.addRegistration(address, this, repliedAddress != null, localOnly);

核心逻辑在addRegistration() 和 addLocalRegistration()中。个人理解是,前个方法明显有问题。最后一句addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);前面的参数都没有使用,应该能够省略,修改成addRegistration(registration::setResult);就能够。不多在Vert.x框架中看到这样不合规范的代码。若是读者有好的看法,欢迎留言。

 

// 调用 addLocalRegistration

// 注册完成

protected <T> void addRegistration(String address, HandlerRegistration<T> registration,
                                   boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(registration.getHandler(), "handler");
  boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly);
  addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);
}

 

/** *

* 初始化 或 获取原 Contex

初始化 或 获取原 Handlers

* 新建  HandlerHolder

* Handlers 里添加  HandlerHolder

**/

protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration,
                                           boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(address, "address");

  Context context = Vertx.currentContext();
  boolean hasContext = context != null;
  if (!hasContext) {
    // Embedded
    context = vertx.getOrCreateContext();
  }
  registration.setHandlerContext(context);

  boolean newAddress = false;

  HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);

  Handlers handlers = handlerMap.get(address);
  if (handlers == null) {
    handlers = new Handlers();
    Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers);
    if (prevHandlers != null) {
      handlers = prevHandlers;
    }
    newAddress = true;
  }
  handlers.list.add(holder);

  if (hasContext) {
    HandlerEntry entry = new HandlerEntry<>(address, registration);
    context.addCloseHook(entry);
  }

  return newAddress;
}

新出现的几个类的做用:

Context 线程调度--Vert.x框架的优势是线程安全,就是经过Context实现。

HandlerHolder--对HandlerRegistration的封装,外加Context。

Handlers--上面HandlerHolder 的集合封装,外加平衡轮询逻辑。

 

handlers.list.add(holder);这句做为压轴(戏曲名词,指一场折子戏演出的倒数第二个剧目)出场完成整个功能的核心注册操做。

至于后面的那段代码,我以为有点问题。

if (hasContext) {
 HandlerEntry entry = new HandlerEntry<>(address, registration);
 context.addCloseHook(entry);
 }

做用是在context上注册关闭事件,由DeploymentManager在unploy的时候调用,对应的核心逻辑在 CloseHooks.run()方法中。但这个这个判断条件案例只有第2次添加consumer的时候才有效果。或者是上面的代码boolean hasContext = context != null;给人的误导? 以上consumer的流程还被reply方法使用。

 

三. Send/Publish发送

多个send重载方法最后定位到EventBusImpl.send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler)。但这个核心方法的最终却调用了一个

名为sendOrPubInternal的方法,不禁得在让人想起写程序最难的事之一是命名。正如开头说的这个使用了设计模式中的命令模式,把参数封装成MessageImpl对象发送到后面的方法。

sendOrPubInternal作了3个事情,

1.createReplyHandlerRegistration -- 有replyHandler.reply()这步才有意义

2.new SendContextImpl -- 从Context类判断,SendContextImpl能够绑定线程

3.sendContext.next(); -- 在执行方法前,执行拦截器。拦截器极大地丰富开发人员的自定义使用。

原本应该1,2,3顺序介绍代码,可是消息流程通常是:

Sender----(    message  )--->customer;

Sender<---(reply message)---customer;

根据这个流程,得先介绍2.new SendContextImpl 和3.sendContext.next();

再回头介绍 1.createReplyHandlerRegistration

 

 

先说 2.new SendContextImpl

这个类是整个Send相关类的大封装。

3.sendContext.next();

根据代码流程

sendOrPub--》deliverMessageLocally--》deliverMessageLocally

进入到deliverMessageLocally(),这个方法作了2个大事情。

  1. 获取address所对应的全部handlers
  2. 根据isSend()区分 send (平衡轮询发一个handler)/publish(遍历handlers发给全部)

方法的第一句话msg.setBus(this);和reply逻辑有关系。在这个local eventbus下,是重复赋值,没有做用的。

而后Handlers handlers = handlerMap.get(msg.address());

这句根据以address为k,取出Handlers。sender的messageImpl 终于和consumer的HandlerHold见面

 

Handler.choose()方法实现了轮询发送message, 我的认为这个方法叫作 balanceChoose()更好。

代码以下:

public HandlerHolder choose() {
  while (true) {
    int size = list.size();
    if (size == 0) {
      return null;
    }
    int p = pos.getAndIncrement();
    if (p >= size - 1) {
      pos.set(0);
    }
    try {
      return list.get(p);
    } catch (IndexOutOfBoundsException e) {
      // Can happen
      pos.set(0);
    }
  }
}

当时我使用Vert.x的时候,就很好奇eventBus的轮询功能怎么实现。如今看到其实很是简单。维护一个 AtomicInteger 的变量,每次调用累加一次。若是超过List的长度,则重置为0,方法永远返回 list.get(p)。巧妙!

最后在deliverToHandler()方法里,在Context的线程控制下,完成message和handler的最终交互。

 

那么,回到最开始的问题,

Sender----(    message  )--->Customer;

Sender<---(reply message)---Customer;

在上面的流程中,Sender根据address找到Customer从而发送message,那么Customer的reply是怎么找到Sender的呢?

答案是一个临时的replyAddress。经过以 replyAddress为key,把Sender做为handler注册到eventBusImpl上,处理后直接注销。replyAddress的规律是从1开始的步长为1的自增数列,因此开发者不该该使用纯数字做为自身业务的Address,避免冲突。

 

最后说说1.createReplyHandlerRegistration

若是sender在发送消息时使用了

send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);方法。

vertx.eventBus().send("address1", "测试消息", ar -> { if (ar.succeeded()) { System.out.println("我是producer1:" + ar.result().body()); } });

而且consumer在接受消息到后,调用了 reply();

vertx.eventBus().consumer("address1", ar -> {
    System.out.println("consumer:" + ar.body());
    ar.reply("consumer reply message ");
});

则会进入createReplyHandlerRegistration的处理逻辑。

使用

protected String generateReplyAddress() {
 return Long.toString(replySequence.incrementAndGet());
}

这里产生从1开始的步长为1的自增数列address。

Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler); HandlerRegistration<T> registration = new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout); registration.handler(simpleReplyHandler);

里面的this是eventBusImpl,并在handler()方法里把 boolean replyHander的值置为true.

这样,eventBusImpl的handlerMap变量里,就多了<replyAddress, replyHander>。

在cuomser处调用reply()后,会在eventBusImpl的内部类ReplySendContextImpl<T> extends SendContextImpl 的参与下,走相似send()的流程。区别是最后在deliverToHandler()方法里,会判断boolean  replyHander的值,若是是true调用完毕就注销.

 

错误代码测验:

vertx.eventBus().consumer("1", ar -> { System.out.println("我不该该在这里" + ar.body()); ar.reply("对不起,其实我是阿杜。"); }); vertx.eventBus().consumer("address1", ar -> { System.out.println("consumer:" + ar.body()); ar.reply("我是高帅富"); }); vertx.eventBus().send("address1", "测试消息", ar -> { if (ar.succeeded()) { System.out.println("sender:接收收到的回应是:"+ar.result().body()); }else{ System.out.println("发送失败"); } }); 

存在consumer("1", ar -> {})的Console:

consumer:测试消息

我不该该在这里我是高帅富

20:08:56.404 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024

20:08:56.405 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096

发送失败

能够看到上面的输出彻底不是设想的结果。

 

若是不存在consumer("1", ar -> {})address为1的Console:

consumer:测试消息 sender:接收收到的回应是:我是高帅富

最后,再次提醒:使用eventBus时,不要使用纯数字做为自身业务的address。

相关文章
相关标签/搜索