消息转发过程(下)

转载自 吕博的博客html

网址 http://blog.sina.com.cn/s/blog_48ded66a01018shx.htmljava


3.消息的转发流程node

    链接的创建是为消息传输服务的,当一个链接处于UP状态,链接两端的节点就能够传输消息。消息的传输过程一样是经过update函数来驱动的,在DTNHost中更新当前节点的路由的update()函数。       缓存

    MessageRouter类是全部路由的父类,但在ONE中几乎是个空类,其最关键的子类是ActiveRouter,几乎全部的路由都是ActiveRouter的子类。ide

    ActiveRouter()之因此称之为主动的路由,就体如今对链接的处理上,它维护着正在发送消息的链接的列表,一旦在update()函数中发现链接已经中断,则采用积极的措施来善后。这些善后措施包括:终止在该链接上的数据传输、清理该消息所占用的额外存储空间。函数

protectedArrayList sendingConnections;

    可是ActiveRouter仍然不能实现消息的转发,由于在update函数中没有涉及到消息的处理,这个功能其实是经过其各类子类来实现的,因为ONE中继承ActiveRouter的路由众多,这里只以最简单的EpidemicRouter为例来讲明消息转发的流程。this

    在EpidemicRouter类的update函数中真正实现了节点经过链接进行消息交互。若是本节点正在发送数据或者不能发送数据,则不进行消息转发。经过调用exchangeDeliverableMessages()函数查看是否有到达目的节点的消息能够发送,如今这个事件优先级是最高的。最后在全部的链接上尝试发送全部的消息。spa

 
 @Override
    public void update() {
       super.update();
       if (isTransferring() || !canStartTransfer()) {
           return; // transferring, don't try other connections yet
       }
       // Try first the messages that can be delivered to final recipient
       if (exchangeDeliverableMessages() != null) {
           return; // started a transfer, don't try others (yet)
       }
       // then try any/all message to any/all connection
       this.tryAllMessagesToAllConnections();
    }

(1)若是消息可以直接发送到目的节点htm

    exchangeDeliverableMessages()的做用是发送可以到达目的节点的消息,即当前的节点遍历全部的消息和全部UP状态的链接,一旦发现某个消息可以到达目的节点,当即发送这个消息。blog

    exchangeDeliverableMessages的返回值是一个Connection,若是没有能够发送的消息,返回值为空,若是能找到一个(注意只要找到一个就能够)能够到达目的节点的消息,则返回发送这个消息所对应的链接。

protected Connection exchangeDeliverableMessages() {
       List connections = getConnections();     //获得全部链接
       if (connections.size() == 0) {     //没有链接,返回为空
           return null;
       }
       @SuppressWarnings(value = "unchecked")
       Tuple t =tryMessagesForConnected(sortByQueueMode(getMessagesForConnected()));
       if (t != null) {
           return t.getValue(); // started transfer
       }
       // didn't start transfer to any node -> ask messages from connected
       for (Connection con : connections) {
           if (con.getOtherNode(getHost()).requestDeliverableMessages(con)) {
              return con;
           }
       }
       return null;
    }

    函数中调用了getMessagesForConnected()函数,其返回值是一个List,组成它的元组格式为,这些元组中都知足条件:(1)Message是当前节点缓存中的消息;(2)Connection是当前节点维护的UP链接;(3)Message的目的节点就是Connection的另外一端。

protected List> getMessagesForConnected() {
       if (getNrofMessages() == 0 || getConnections().size() == 0) {
          
           return new ArrayList>(0);
       } //若是当先节点根本没有消息或者根本没有可用的链接,返回的是一个空List,而不是NULL
 
       List> forTuples =     //用于储存临时的元组
           new ArrayList>();
       for (Message m : getMessageCollection()) {       //遍历全部的Message
           for (Connection con : getConnections()) {     //遍历全部的Connection
              DTNHost to = con.getOtherNode(getHost()); //提取Connection的另外一端
              if (m.getTo() == to) {                    //消息m是发往con的另外一端的
                  forTuples.add(new Tuple(m,con));
              }
           }
       }
       return forTuples;
    }

    将getMessagesForConnected()的返回值传递给sortByQueueMode函数,这个函数根据消息的优先级对消息进行排序,ONE中定义了两种优先级策略,一是随机排序(默认为0),二是先入先出策略(值为1),这个值能够在配置脚本中进行设置,语句为Group.sendQueue = 1。若是须要对缓存机制进行扩展,显然须要修改这个函数。因为在这里并无涉及缓存策略,所以不赘述。sortByQueueMode()函数返回的是排序以后的List,保持元组格式不变。

    获得了排序以后的,传递给tryMessagesForConnected函数,这个函数的做用是将元组中优先级最高的那个(最上面的那个消息)在对应的connection上发送出去。要强调的是,这里只发送一个消息,返回的一个元组,而不是元组的List。固然优先级最高的那个消息可能不会发送成功,那么则会尝试发送优先级次之的消息,若是List中全部的消息都尝试过了,那只好返回NULL了。

protected TupletryMessagesForConnected(
           List> tuples) {
       if (tuples.size() == 0) {   //元组为空,返回NULL
           return null;
       }
       for (Tuple t : tuples) {    //遍历全部的元组
           Message m = t.getKey();                       //提取出消息m  
           Connection con = t.getValue();                //提取出链接con
           if (startTransfer(m, con) == RCV_OK) {        //将消息m在con上发送
              return t;         //若是消息发送成功,直接跳出tryMessagesForConnected
           }
       }
       return null;
    }

    怎样判断消息m可否在con上发送成功?须要调用startTransfer函数。这个函数很简单,可是其返回值很关键。当函数返回TRY_LATER_BUSY=1时,说明该链接正忙,稍后再试,当返回RCV_OK=0时,说明发送成功,以上还很容易理解。但retVal还有其余的取值来代表消息发送的状况,DENIED_OLD = -1表示该消息发送失败,缘由是已经处理过(已经接收过或者已经转发过),DENIED_NO_SPACE = -2表示该消息发送失败,缘由是缓存中没有空位置,DENIED_TTL=-3表示消息发送失败,缘由是该消息超时。

protected int startTransfer(Message m, Connection con) {
       int retVal;
      
       if (!con.isReadyForTransfer()) {       //链接正忙,返回1
           return TRY_LATER_BUSY;
       }
       retVal = con.startTransfer(getHost(), m);     //调用链接的startTrransfer函数
       if (retVal == RCV_OK) { // started transfer
           addToSendingConnections(con);
       }
       else if (deleteDelivered && retVal == DENIED_OLD &&
              m.getTo() == con.getOtherNode(this.getHost())) {    
           this.deleteMessage(m.getId(), false);     //清理因DENIED_OLD被拒绝的消息
       }
       return retVal;
    }

    在这里涉及到缓存的清理机制,若是在配置脚本中设置Group.deleteDelivered = 1,则开启了已发送消息的清理机制,这样,一旦节点发现消息m因DENIED_OLD被拒绝,则会清除本地缓存中的m。

    在路由的startTransfer函数中调用了链接的startTransfer,这个函数在CBRConnection类中被重载。在CBRConnection的startTransfer中,首先调用对方节点的receiveMessage函数,看对方节点可否接收这个消息。若是能接收,则进行如下操做:(1)设置正在发送的消息msgOnFly;(2)计算消息预计完成时间;(3)返回值RCV_OK=0。

public int startTransfer(DTNHost from, Message m) {
 
       assert this.msgOnFly == null : "Already transferring " +
       this.msgOnFly + " from " + this.msgFromNode + " to " +
       this.getOtherNode(this.msgFromNode) + ". Can't "+
       "start transfer of " + m + " from " + from;
 
       this.msgFromNode = from;
       Message newMessage = m.replicate();
       int retVal = getOtherNode(from).receiveMessage(newMessage, from);
 
       if (retVal == MessageRouter.RCV_OK) {
           this.msgOnFly = newMessage;
           this.transferDoneTime = SimClock.getTime() +
           (1.0*m.getSize()) / this.speed;
       }
       return retVal;
    }

    在上面的函数中,调用的是节点的receiveMessage函数(在DTNHost.java定义),进而进入路由的receiveMessage函数,在MessageRouter.java中的receiveMessage函数进行了下面的几个工做:

    (1)将消息m放进IncomingBuffer;

    (2)当前节点存入消息m的Path中,Path是一系列DTNHost组成的列表,记录着这个消息所走过的路径;

    (3)通知全部的Listener发生了messageTransferStarted事件;

    (4)返回RCV_OK=0。

public int receiveMessage(Message m, DTNHost from) {
       Message newMessage = m.replicate();
             
       this.putToIncomingBuffer(newMessage, from);     
       newMessage.addNodeOnPath(this.host);
      
       for (MessageListener ml : this.mListeners) {
           ml.messageTransferStarted(newMessage, from, getHost());
       }
      
       return RCV_OK; // superclass always accepts messages
    }

    至此消息m被成功的发送到了目的节点,实际上工做才完成了一半,由于还有一些消息不能被发送到目的节点,须要一些中间节点进行转发。

(2) 当消息不能直接到达目的节点时 

    在update函数中还有一个函数tryAllMessagesToAllConnections()专门负责处理类事件,他的优先级是要低于exchangeDeliverableMessages()的,也就是说当前节点若是有消息可以直接发送到目的节点,就应该优先发送。不然才会执行exchangeDeliverableMessages()函数,首先对全部的消息从新排序,排序的函数还是sortByQueueMode,在前面介绍过。最后,调用tryMessagesToConnections函数遍历排序以后的全部消息和全部链接,寻找可以发送的消息。

 
 protected Connection tryAllMessagesToAllConnections(){
       List connections = getConnections();
 
       if (connections.size() == 0 || this.getNrofMessages() == 0) {
           return null;
       }
 
       List messages =  new ArrayList(this.getMessageCollection());
       this.sortByQueueMode(messages);    //将全部的消息从新排序
 
       return tryMessagesToConnections(messages, connections);
    }

    在tryMessagesToConnections中,遍历全部的链接,而后调用tryAllMessages(con, messages)依次在每一个链接上发送全部的消息。注意区别:在tryMessagesToConnections(messages, connections)中,messages指的是不少消息,connections指的是不少链接;而在tryAllMessages(con, messages)中,messages指的是不少消息,con指的是某个链接;然而在startTransfer(m, con)函数中,m是指某个消息,con是指某个链接。

    在tryAllMessages函数中,遍历全部的消息,依次尝试在con上可否发送成功,只要找到一个能发送的消息m,就终止该函数,返回消息m。不然尝试全部的消息都不能在con上发送,返回NULL。

protected Message tryAllMessages(Connection con, List messages) {
       for (Message m : messages) {
           int retVal = startTransfer(m, con);
           if (retVal == RCV_OK) {
              return m;  // accepted a message, don't try others
           }
           else if (retVal > 0) {
              return null; // should try later -> don't bother trying others
           }
       }
      
       return null; // no message was accepted      
    }

    在发送消息的函数中,仍然执行的是startTransfer函数,startTransfer函数在前面已经介绍过了。

    到这里,EpidemicRouter中发送消息的流程已经介绍清楚了,EpidemicRouter路由中鼓励节点尽量将全部的消息都转发给本身的邻居,所以只须要在ActiveRouter父类上作简单的扩展便可。接下的部分以其余路由为例研究一下路由协议是怎么扩展ActiveRouter的。


4.  路由扩展

 

(1)DirectDeliveryRouter

    这个路由的改法是最简单的,只须要在EpidemicRouter的基础上删除this.tryAllMessagesToAllConnections()函数,保留exchangeDeliverableMessages()函数便可,那么消息的转发流程中只剩下了第一部分。

 (2)FirstContactRouter

    EpidemicRouter是最简单的洪泛消息,其余路由协议与它最大的区别就在于:并非全部的消息都照单全收。回忆上面的消息传输过程,不管是到目的节点仍是到中间节点,都须要执行startTransfer,它负责在某个链接con上发送某个消息m。在这个函数中,首先找到con的另外一端DTNHost,而后调用receiveMessage函数查看对方能不能接收消息m。receiveMessage是一个不断被各类路由重载的函数,在MessageRouter中,实现了最基本的功能:

        将消息m放进IncomingBuffer;

        当前节点存入消息m的Path中;

        通知全部的Listener发生了messageTransferStarted事件;

    在ActiveRouter中增长了功能:

    调用checkReceiving函数看对方节点能不能接收消息m,不少种可能都会致使消息不能被接收,在ActiveRouter中主要有:

    (1)对方节点正在发送消息,返回TRY_LATER_BUSY=1;

    (2)对方节点缓存中已经有该消息了,返回DENIED_OLD=-1;

    (3)消息超时,返回DENIED_TTL=-2;

    (4)对方节点缓存已满,返回ENIED_NO_SPACE=-3。

    从上面的分析发现,要想消息按某种条件被转发,在checkReceiving函数中进行一些过滤是很合理的。

    FirstContactRouter路由中,区别在于节点把数据转发给第一次遇到的节点,而后就会将转发过的消息从缓存中删除。这里重载了checkReceiving函数。每次节点转发某个消息,都会将本身写在消息的path中,当节点发现本身已经处理过某个消息了,则会用DENIED_OLD的理由将其拒绝。须要指出的是顺序问题,将当前节点加入消息路径Path的操做是在receiveMessage中,这个操做是在checkReceiving以后,因此在检索path的时候当前节点尚未将本身写进Path。

@Override
    protected int checkReceiving(Message m) {
       int recvCheck = super.checkReceiving(m);
      
       if (recvCheck == RCV_OK) {
          
           if (m.getHops().contains(getHost())) {
              recvCheck = DENIED_OLD;
           }
       }
       return recvCheck;
    }

    除此以外,FirstContactRouter还重载了transferDone函数。节点将消息发送成功以后就会清理缓存。这个函数在ActiveRouter中为空,在EpidemicRouter中也没有将其重载,因此在这里是第一次出现。

@Override
    protected void transferDone(Connection con) {
      
       this.deleteMessage(con.getMessage().getId(), false);
    }

    在这里只是给出了两个路由协议最基本的例子,本文重点在于解释TheONE中消息转发的流程,所以其余的路由协议将在其余文章中进行分析。

相关文章
相关标签/搜索