thrift之TTransport层的分帧传输类TFramedTransport

        帧传输类就是按照一帧的固定大小来传输数据,全部的写操做首先都是在内存中完成的直到调用了flush操做,而后传输节点在flush操做以后将全部数据根据数据的有效载荷写入数据的长度的二进制块发送出去,容许在接收的另外一端按照固定的长度来读取。
  帧传输类一样仍是从缓存基类TBufferBase继承而来,实现的接口固然也基本相同,只是实现的方式不一样而已,下面就来看看具体的实现过程和原理。
  这个类所采用的默认缓存长度是512(static const int DEFAULT_BUFFER_SIZE = 512;),两个基本构造函数一个采用默认的缓存长度,另外一个能够指定一个须要的缓存长度。下面仍是重点分析慢读、读帧等操做的实现过程:
  (1)慢读实现以下:缓存

  uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
    uint32_t want = len;//想要读取的长度
    uint32_t have = rBound_ - rBase_;//内存缓存中已经有的数据的长度
    assert(have < want);//若是之后数据长度知足须要读的长度就不须要采用慢读
  
    // 若是咱们有一些数据在缓存,拷贝出来而且返回它。
    // 咱们没有试图读取更多的数据而是不得不返回它,由于咱们不能保证在下面的  
    // 传输层实际上有更多的数据,所以应该尝试阻塞式读它。
    if (have > 0) {
      memcpy(buf, rBase_, have);//拷贝出缓存中已有的数据
      setReadBuffer(rBuf_.get(), 0);//从新设置缓存基地址
      return have;//返回
    }
    // 读取另外一帧。
    if (!readFrame()) {
      // EOF.  No frame available.
      return 0;
    }
    // 处理咱们已有的数据
    uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));//已有数据想要读取长度取短的
    memcpy(buf, rBase_, give);//拷贝
    rBase_ += give;//调整缓存基地址
    want -= give;//计算还有多少想要的数据没有获得
  
    return (len - want);//返回实际读取长度
  }

  缓存中没有数据的时候就会调用读取帧的函数readFrame,这个函数实现以下:安全

  bool TFramedTransport::readFrame() {
    //首先读下一帧数据的长度
    int32_t sz;//存放长度的变量
    uint32_t size_bytes_read = 0;//读取长度数据的字节数
    while (size_bytes_read < sizeof(sz)) {//表示长度的数据小于存放长度数据的字节数
      uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read;//长度变量转换为指针
      uint32_t bytes_read = transport_->read(szp, sizeof(sz) - size_bytes_read);//读取
      if (bytes_read == 0) {//若是返回为0表示没有数据了
        if (size_bytes_read == 0) {//没有任何数据读到,返回false
          return false;
        } else {
          // 部分的帧头部,抛出异常。
          throw TTransportException(TTransportException::END_OF_FILE,
                                    "No more data to read after "
                                    "partial frame header.");
        }
      }
      size_bytes_read += bytes_read;//以读取的长度
    }
    sz = ntohl(sz);//长整数的网络字节序转换为主机字节序
    if (sz < 0) {//帧的长度不能是负数涩,抛出异常
      throw TTransportException("Frame size has negative value");
    }
    // 读取有效数据负载,从新设置缓存标记。
    if (sz > static_cast<int32_t>(rBufSize_)) {
      rBuf_.reset(new uint8_t[sz]);//接收基地址
      rBufSize_ = sz;//缓存大小
    }
    transport_->readAll(rBuf_.get(), sz);//调用readAll读取sz长度的数据
    setReadBuffer(rBuf_.get(), sz);//设置读缓存基地址
    return true;
  }

  从上面实现代码看出,在按帧读取的过程当中,首先须要读取这一帧的头部信息,而这个头部信息就是这一帧的长度,后面就根据头部信息中给定的长度来读取数据部分,读出来的数据放入缓存中。读取头部信息时注意处理异常的状况,还有就是读出来的数据须要通过网络字节序到主机字节序的转换。下面继续看慢写函数和flush刷新函数的实现过程,慢写函数实现以下(快读和快写基类TBufferBase的实现已经知足要求了,因此不须要再去单独实现了):网络

  void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
    // 直到有足够的双缓冲大小
    uint32_t have = wBase_ - wBuf_.get();//缓存空间已经有多少数据
    uint32_t new_size = wBufSize_;
    if (len + have < have /* overflow */ || len + have > 0x7fffffff) {//若是长度溢出或大于2GB了
      throw TTransportException(TTransportException::BAD_ARGS,
          "Attempted to write over 2 GB to TFramedTransport.");//抛出异常
    }
    while (new_size < len + have) {//缓存空间的长度小于已有数据的长度和须要写入数据长度的和
      new_size = new_size > 0 ? new_size * 2 : 1;若是缓存空间长度是大于0的话就扩容一倍的空间
    }
    
    uint8_t* new_buf = new uint8_t[new_size];// 分配新空间
    memcpy(new_buf, wBuf_.get(), have);// 拷贝已有的数据到新空间.
    
    wBuf_.reset(new_buf);// 缓存地址从新设置
    wBufSize_ = new_size;// 缓存新长度
    wBase_ = wBuf_.get() + have;//新的开始写入地址
    wBound_ = wBuf_.get() + wBufSize_;//写入界限
    memcpy(wBase_, buf, len);//拷贝数据到新缓存地址
    wBase_ += len;//更新缓存基地址
  }

  上面代码就是实现把从上层传输的数据写入缓存中以供下层发送使用,这段代码须要注意的是while循环,这个while循环保证有足够的缓存来存放写入的数据到缓存中,每次增加的长度是上次的一倍;还须要注意的是,分配了新的空间须要把原来尚未真正写入的数据拷贝到新缓存中来,否则就会形成内容丢失;最后就是更新缓存的基地址和长度等描述缓存的信息。继续看flush函数的实现代码:函数

  void TFramedTransport::flush()  {
    int32_t sz_hbo, sz_nbo;
    assert(wBufSize_ > sizeof(sz_nbo));//断言缓存长度应该大于个字节sizeof(int32_t)
    sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo));// 滑动到第一帧数据的开始位置。
    sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));//主机字节序转换为网络字节序
    memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));//头部长度拷贝写缓存
  
    if (sz_hbo > 0) {//保证缓存有须要写入的数据
      //若是底层传输写抛出了异常注意确保咱们处于安全的状态
      //(例如内部缓冲区清理),重置咱们写入前的状态(由于底层没有传输成功)
      wBase_ = wBuf_.get() + sizeof(sz_nbo);//获得
      // 写入长度和帧
      transport_->write(wBuf_.get(), sizeof(sz_nbo)+sz_hbo);
    }
    // 刷新底层传输.
    transport_->flush();
  }

  刷新函数就是把缓存中的数据真正的发送出去,可是在写入到底层时,底层传输可能不会真正成功(如网络忽然断了),这个时候底层会抛出异常,那么咱们须要捕获异常,以便从新处理这些数据,只有数据真正写入成功的时候咱们才计算咱们写如数据的长度。因此还有写结束和读结束函数writeEnd、readEnd,它们都只有简单的一句代码就是计算真正完成读写数据的长度。
  整个按帧传输的类的功能介绍完毕了,主要须要注意的就是缓存的操做,保证数据不丢失。未来实现考虑分配内存使用c语言的malloc类函数,而不是使用new操做,这样也能提升很多的效率。ui

相关文章
相关标签/搜索