Wangle中的Pipeline和Netty中的Pipeline是很类似的,既能够将它看为一种职责链模式的实现也能够看做是Handler的容器。Pipeline中的handler都是串行化执行的,前一个handler完成本身的工做以后把事件传递给下一个handler,理论上Pipeline中的全部handler都是在同一个IO线程中执行的,可是为了防止某些handler(好比序列化、编解码handler等)耗时过长,Netty中容许为某些handler指定其它线程(eventloop)异步执行,相似的功能在Wangle中也有体现,只是在实现方式上有些区别。和Netty中一个较大的区别是,Wangle中并无专门的Channel定义,Wangle中的Pipeline兼有了Channel的角色和功能。下面分别就Pipeline、Handler和Context的顺序进行源码分析。java
PipelineBase做为Pipeline的基类,提供了一些最为通用、核心的api实现,好比对handler的操做:addBack及其变体、addFront及其变体、remove及其变体等,下面看一下addBack的一个实现版本:api
template <class H> PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) { typedef typename ContextType<H>::type Context;// 声明Conetxt类型,ContextImpl<Handler>、InboundContextImpl<Handler>、OutboundContextImpl<Handler>其中之一 // 使用Context包装Handler后,将其添加到pipeline中,Context中还持有pipeline的引用 return addHelper( std::make_shared<Context>(shared_from_this(), std::move(handler)), false);// false标识添加到尾部 }
首先,会根据要添加的handler类型定义一个Context(Context能够当作是Handler的外套,后面还会单独介绍)类型,而后根据这个Context类型建立一个Context:参数为Pipeline指针和handler,最终addHelper会将Context添加到容器管理起来:app
template <class Context> PipelineBase& PipelineBase::addHelper(std::shared_ptr<Context>&& ctx,bool front) { // 先加入总的Context (std::vector<std::shared_ptr<PipelineContext>>) // 该vector种使用的是智能指针,能够保持对Context的引用 ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx); // 而后根据方向(BOTH、IN、OUT分别加入相应的vector中) // std::vector<PipelineContext*> 这里放的是Context的指针,由于引用在上面的容器中已经保持 if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) { inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get()); } if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) { outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get()); } return *this; }
Context内部包含了Pipeline、Handler,和Handler同样,Context也有方向:BOTH、IN、OUT,首先,不管Context 什么方向,都会在ctxs_容器上添加这个Context,而后会根据Context方向的不一样,分别在inCtxs_和outCtxs_上添加该Context。接下来看一下这三个容器的定义:异步
std::vector<std::shared_ptr<PipelineContext>> ctxs_; // 全部的PipelineContext std::vector<PipelineContext*> inCtxs_; // inbound 类型的PipelineContext std::vector<PipelineContext*> outCtxs_; // outbound 类型的PipelineContext
因为handler的其余操做(addFront、remove等)都是对这三个容器的增删操做,原理同样,此处再也不赘述。ide
PipelineBase中还提供了设置PipelineManager的接口,从字面理解,PipelineManager就是管理Pipeline的接口,其定义以下:函数
class PipelineManager { public: virtual ~PipelineManager() = default; virtual void deletePipeline(PipelineBase* pipeline) = 0; virtual void refreshTimeout() {}; };
其中,deletePipeline会在显示调用一个pipeline的close方法时被调用,通常用来完成该Pipeline相关的资源释放,而refreshTimeout主要在Pipeline发生读写事件时被回调,主要用来刷新Pipeline的空闲时间。所以,若是你须要监听Pipeline的delete和refresh事件,那么能够本身实现一个PipelineManager并设置到Pipeline上。oop
在Wangle中没有定义专门的Channel结构,其实Wangle中的Pipeline兼有Channel的功能,好比要判断一个Channel是否还处于链接状态,在Netty中代码以下:源码分析
channel.isConnected();
那么Wangle中的Pipeline并无此类方法可供使用,怎么办呢?其实,Wangle的Pipeline提供了一个更强大的方法:getTransport,该方法能够得到一个底层的AsyncTransport,而该AsyncTransport拥有全部的底层链接信息,好比(仅列出主要接口):ui
class AsyncTransport : public DelayedDestruction, public AsyncSocketBase { public: typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr; virtual void close() = 0; virtual void closeNow() = 0; virtual void closeWithReset() { closeNow(); } virtual void shutdownWrite() = 0; virtual void shutdownWriteNow() = 0; virtual bool good() const = 0; virtual bool readable() const = 0; virtual bool isPending() const { return readable(); } virtual bool connecting() const = 0; virtual bool error() const = 0; virtual void attachEventBase(EventBase* eventBase) = 0; virtual void detachEventBase() = 0; virtual bool isDetachable() const = 0; virtual void setSendTimeout(uint32_t milliseconds) = 0; virtual uint32_t getSendTimeout() const = 0; virtual void getLocalAddress(SocketAddress* address) const = 0; virtual void getAddress(SocketAddress* address) const { getLocalAddress(address); } virtual void getPeerAddress(SocketAddress* address) const = 0; virtual ssl::X509UniquePtr getPeerCert() const { return nullptr; } };
至此,PipelineBase中的主要功能分析完毕。this
Pipeline是PipelineBase的子类,其具体定义以下:
template <class R, class W = folly::Unit> class Pipeline : public PipelineBase { public: using Ptr = std::shared_ptr<Pipeline>; static Ptr create() { return std::shared_ptr<Pipeline>(new Pipeline()); } ~Pipeline(); // 模板方法 template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type read(R msg);//front_->read(std::forward<R>(msg)); --> this->handler_->read(this, std::forward<Rin>(msg)); template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type readEOF();//front_->readEOF(); template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type readException(folly::exception_wrapper e);//front_->readException(std::move(e)); template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type transportActive();// front_->transportActive(); template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type transportInactive();//front_->transportInactive(); template <class T = W> typename std::enable_if<!std::is_same<T, folly::Unit>::value, folly::Future<folly::Unit>>::type write(W msg);//back_->write(std::forward<W>(msg)); template <class T = W> typename std::enable_if<!std::is_same<T, folly::Unit>::value, folly::Future<folly::Unit>>::type writeException(folly::exception_wrapper e);//back_->writeException(std::move(e)); template <class T = W> typename std::enable_if<!std::is_same<T, folly::Unit>::value, folly::Future<folly::Unit>>::type close();//back_->close() void finalize() override; protected: Pipeline(); explicit Pipeline(bool isStatic); private: bool isStatic_{false}; InboundLink<R>* front_{nullptr};// inbound类型Context(read) OutboundLink<W>* back_{nullptr};// outbound类型Context (write) };
能够看到,Pipeline主要定义和实现了一些和Handler对应的经常使用方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close。同时,Pipeline还定义了两个私有成员:front_和back_,从类型能够看出这是两个不一样的方向,首先看一下InboundLink定义:
template <class In> class InboundLink { public: virtual ~InboundLink() = default; virtual void read(In msg) = 0; virtual void readEOF() = 0; virtual void readException(folly::exception_wrapper e) = 0; virtual void transportActive() = 0; virtual void transportInactive() = 0; };
能够看出,InboundLink只是把Pipeline主要方法中的IN方向单独抽象出来,都是一个IN事件(输入事件),那么可想而知OutboundLink的定义:
template <class Out> class OutboundLink { public: virtual ~OutboundLink() = default; virtual folly::Future<folly::Unit> write(Out msg) = 0; virtual folly::Future<folly::Unit> writeException( folly::exception_wrapper e) = 0; virtual folly::Future<folly::Unit> close() = 0; };
的确,OutboundLink定义的都是OUT事件类型的操做。
前文在讲PipelineBase时,addBack之类的操做都是只针对那三个容器进行的,没有地方对front_链表和back_链表进行操做啊?其实,front_链表和back_链表的设置是在Pipeline的finalize中完成的:
template <class R, class W> void Pipeline<R, W>::finalize() { front_ = nullptr; if (!inCtxs_.empty()) { front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front()); for (size_t i = 0; i < inCtxs_.size() - 1; i++) { inCtxs_[i]->setNextIn(inCtxs_[i + 1]); } inCtxs_.back()->setNextIn(nullptr); } back_ = nullptr; if (!outCtxs_.empty()) { back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back()); for (size_t i = outCtxs_.size() - 1; i > 0; i--) { outCtxs_[i]->setNextOut(outCtxs_[i - 1]); } outCtxs_.front()->setNextOut(nullptr); } if (!front_) { detail::logWarningIfNotUnit<R>( "No inbound handler in Pipeline, inbound operations will throw " "std::invalid_argument"); } if (!back_) { detail::logWarningIfNotUnit<W>( "No outbound handler in Pipeline, outbound operations will throw " "std::invalid_argument"); } for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) { (*it)->attachPipeline(); } }
代码很简单,以IN方向为例,遍历inCtxs_容器,对容器中的每个Context调用其setNextIn方法将Context组成一个单向链表front_。同理,outCtxs_最终会变为back_单向链表。最后,还会遍历Context的总容器ctxs_,为每个Context调用attachPipeline方法,该方法主要工做就是把Context绑定到对应的Handler上(最终是Context和Handler都互相持有对方的引用),还会回调Handler的attachPipeline方法。
此处还有一个细节,Pipeline是一个模板类,具备两个模板参数template <class R, class W = folly::Unit>,分别表明Pipeline的 read(IN事件)的数据类型和write(out事件)数据类型,这些类型的设置要和Pipeline中的handler类型向匹配(后文还会详细讲解)。
下面就以Pipeline中的write方法来看一下事件的流动过程:
template <class R, class W> template <class T> typename std::enable_if < !std::is_same<T, folly::Unit>::value, folly::Future<folly::Unit >>::type Pipeline<R, W>::write(W msg) { if (!back_) { throw std::invalid_argument("write(): no outbound handler in Pipeline"); } return back_->write(std::forward<W>(msg)); }
Pipeline的write方法只是简单的调用back_的wirte方法,也就是OUT类型的事件会从Pipeline的最后一个Context依次向前传递(只传递给OUT类型的handler)。
Handler在继承层次上相似于Pipeline,首先有一个基类HandlerBase,其定义以下:
template <class Context> class HandlerBase { public: virtual ~HandlerBase() = default; virtual void attachPipeline(Context* /*ctx*/) {} virtual void detachPipeline(Context* /*ctx*/) {} // 获取绑定的Context Context* getContext() { if (attachCount_ != 1) { return nullptr; } CHECK(ctx_); return ctx_; } private: friend PipelineContext; // 设置PipelineContext为友元类,便于PipelineContext操做本身 uint64_t attachCount_{0}; // 绑定计数,同一个handler能够被同时绑定到不一样的pipeline中 Context* ctx_{nullptr}; // 该Handler绑定的Context };
HandlerBase内部组合了一个绑定的Context指针,并提供了getContext接口用于获取这个Handler绑定的Context。
Handler做为HandlerBase的子类,它具备四个模板参数: Rin、Rout、Win、Wout,其中Rin做为Handler和Context中read方法中消息的数据类型,Rout是做为Context中fireRead方法的参数类型。同理,Win是做为Handler和Context中wirte方法的消息参数类型,而Wout是做为Context中fireWrite的消息参数类型。能够这么理解:Xout是做为以fire开头的事件方法的参数类型。
template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin> class Handler : public HandlerBase<HandlerContext<Rout, Wout>> { public: static const HandlerDir dir = HandlerDir::BOTH; // 方向为双向 typedef Rin rin; typedef Rout rout; typedef Win win; typedef Wout wout; typedef HandlerContext<Rout, Wout> Context; // 声明该HandlerContext类型 virtual ~Handler() = default; // inbound类型事件 virtual void read(Context* ctx, Rin msg) = 0; virtual void readEOF(Context* ctx) { ctx->fireReadEOF(); } virtual void readException(Context* ctx, folly::exception_wrapper e) { ctx->fireReadException(std::move(e)); } virtual void transportActive(Context* ctx) { ctx->fireTransportActive(); } virtual void transportInactive(Context* ctx) { ctx->fireTransportInactive(); } // outbound类型事件 virtual folly::Future<folly::Unit> write(Context* ctx, Win msg) = 0; virtual folly::Future<folly::Unit> writeException(Context* ctx, folly::exception_wrapper e) { return ctx->fireWriteException(std::move(e)); } virtual folly::Future<folly::Unit> close(Context* ctx) { return ctx->fireClose(); } };
相似于Pipeline,Handler也相应的定义了inbound类型和outbound类型事件,分别对应方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close(这些方法和Pipeline中一一对应)。其中,除了read和write两个方法是纯虚接口以外,其余的方法都提供了默认实现:就是将事件进行透传(调用Context里fireXxx方法)。
同理,根据事件类型的不一样,还能够进一步细分Handler类型,好比InboundHandler类型为:
// inbound类型的Handler (默认状况下读入和读出的类型是一致) template <class Rin, class Rout = Rin> class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> { public: static const HandlerDir dir = HandlerDir::IN; // 方向为输入 typedef Rin rin; typedef Rout rout; typedef folly::Unit win; typedef folly::Unit wout; typedef InboundHandlerContext<Rout> Context; // 声明inbound类型的InboundHandlerContext virtual ~InboundHandler() = default; // 纯虚函数。由子类实现 virtual void read(Context* ctx, Rin msg) = 0; // 下面的默认实现都是事件的透传 virtual void readEOF(Context* ctx) { ctx->fireReadEOF(); } virtual void readException(Context* ctx, folly::exception_wrapper e) { ctx->fireReadException(std::move(e));// std::move } virtual void transportActive(Context* ctx) { ctx->fireTransportActive(); } virtual void transportInactive(Context* ctx) { ctx->fireTransportInactive(); } };
相应的,OutboundHandler类型定义为:
// outbound类型的Handler (默认写入类型和写出类型一致,若是不一致就会产生不少的转换) template <class Win, class Wout = Win> class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> { public: static const HandlerDir dir = HandlerDir::OUT; // 方向为输出 typedef folly::Unit rin; typedef folly::Unit rout; typedef Win win; typedef Wout wout; typedef OutboundHandlerContext<Wout> Context; virtual ~OutboundHandler() = default; // 纯虚函数。由子类实现 virtual folly::Future<folly::Unit> write(Context* ctx, Win msg) = 0; // 下面的默认实现都是事件的透传 virtual folly::Future<folly::Unit> writeException( Context* ctx, folly::exception_wrapper e) { return ctx->fireWriteException(std::move(e)); } virtual folly::Future<folly::Unit> close(Context* ctx) { return ctx->fireClose(); } };
前文所说,Handler全部的事件方法中只有read和write是纯虚接口,这样用户每次实现本身的Handler时都须要override这两个方法(即便只是完成简单的事件透传),所以,为了方便用户编写本身的Handler,Wangle提供了HandlerAdapter,HandlerAdapter其实很简单,就是以事件透传的方式重写(override)了read个write两个方法。代码以下:
// Handler适配器 template <class R, class W = R> class HandlerAdapter : public Handler<R, R, W, W> { public: typedef typename Handler<R, R, W, W>::Context Context; // 将read事件直接进行透传 void read(Context* ctx, R msg) override { ctx->fireRead(std::forward<R>(msg)); } // 将write事件直接进行透传 folly::Future<folly::Unit> write(Context* ctx, W msg) override { return ctx->fireWrite(std::forward<W>(msg)); } };
如前文所述,Pipeline中直接管理的并非Handler,而是Context,为了便于理解,此处再把Pipeline中的addBack源码列出来:
template <class H> PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) { typedef typename ContextType<H>::type Context;// 声明Conetxt类型,ContextImpl<Handler>、InboundContextImpl<Handler>、OutboundContextImpl<Handler>其中之一 // 使用Context包装Handler后,将其添加到pipeline中,Context中还持有pipeline的引用 return addHelper( std::make_shared<Context>(shared_from_this(), std::move(handler)), false);// false标识添加到尾部 }
其中,ContextType的定义以下,它会根据Handler的类型(具体来讲是方向)决定Context的类型,若是Handler是双向的,那么Context类型为ContextImpl<Handler>,若是Handler的方向为IN,那么Context类型为InboundContextImpl<Handler>,若是Handler的方向为OUT,那么Context类型为OutboundContextImpl<Handler>。
template <class Handler> struct ContextType { // template< bool B, class T, class F > // type T if B == true, F if B == false typedef typename std::conditional < Handler::dir == HandlerDir::BOTH, //若是是双向 ContextImpl<Handler>, //类型就是ContextImpl<Handler> typename std::conditional< //若是不是双向,那么还须要细分 Handler::dir == HandlerDir::IN, //若是是IN类型 InboundContextImpl<Handler>, //那么类型就是InboundContextImpl<Handler> OutboundContextImpl<Handler> //不然就是OutboundContextImpl<Handler> >::type >::type type; // Context类型 };
其实,InboundContextImpl和OutboundContextImpl都是ContextImpl的子类,ContextImpl的继承关系为:
template <class H> class ContextImpl : public HandlerContext<typename H::rout, typename H::wout>, public InboundLink<typename H::rin>, public OutboundLink<typename H::win>, public ContextImplBase<H, HandlerContext<typename H::rout, typename H::wout>>
能够看到,ContextImpl一个继承自四个父类:HandlerContext、InboundLink、OutboundLink和ContextImplBase,其中HandlerContext中主要定义了以fire开头的事件传递方法;InboundLink和OutboundLink分别定义了Handler中Inbound和Outbound类型的方法接口,还记得Pipeline中用于管理IN方向和OUT方向的两个链表:front_和back_,它们就分别是InboundLink和OutboundLink类型;ContextImplBase主要提供了Pipeline中Context在组装链表时的接口,好比:setNextIn、setNextOut,以及用于将Context绑定到handler上的attachPipeline方法。
首先来看HandlerContext基类:
// HandlerContext定义(集inbound和outbound类型于一身) // 以fire开始的方法都是Context中的事件方法 template <class In, class Out> class HandlerContext { public: virtual ~HandlerContext() = default; // inbound类型事件接口 virtual void fireRead(In msg) = 0; virtual void fireReadEOF() = 0; virtual void fireReadException(folly::exception_wrapper e) = 0; virtual void fireTransportActive() = 0; virtual void fireTransportInactive() = 0; // outbound类型事件接口 virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0; virtual folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) = 0; virtual folly::Future<folly::Unit> fireClose() = 0; virtual PipelineBase* getPipeline() = 0; virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0; std::shared_ptr<folly::AsyncTransport> getTransport() { return getPipeline()->getTransport(); } virtual void setWriteFlags(folly::WriteFlags flags) = 0; virtual folly::WriteFlags getWriteFlags() = 0; virtual void setReadBufferSettings( uint64_t minAvailable, uint64_t allocationSize) = 0; virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0; };
HandlerContext主要定义了以fire开头的事件传播方法:fireRead、fireReadEOF、fireReadException、fireTransportActive、fireTransportInactive、fireWrite、fireWriteException、fireClose,以及getPipeline用于获取Context绑定的Pipeline、getPipelineShared以智能指针的形式获取Pipeline、getTransport用于获取Pipeline对应的Transport。
根据事件流向的不一样,Context也能够细分定义,InboundHandlerContext定义为:
// inbound 类型的InboundHandlerContext template <class In> class InboundHandlerContext { public: virtual ~InboundHandlerContext() = default; virtual void fireRead(In msg) = 0; virtual void fireReadEOF() = 0; virtual void fireReadException(folly::exception_wrapper e) = 0; virtual void fireTransportActive() = 0; virtual void fireTransportInactive() = 0; virtual PipelineBase* getPipeline() = 0; virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0; std::shared_ptr<folly::AsyncTransport> getTransport() { return getPipeline()->getTransport(); } };
同理,OutboundHandlerContext定义为:
// outbound 类型的OutboundHandlerContext template <class Out> class OutboundHandlerContext { public: virtual ~OutboundHandlerContext() = default; virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0; virtual folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) = 0; virtual folly::Future<folly::Unit> fireClose() = 0; virtual PipelineBase* getPipeline() = 0; virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0; std::shared_ptr<folly::AsyncTransport> getTransport() { return getPipeline()->getTransport(); } };
如前文所述,PipelineContext主要定义了如何在Pipeline中组织Context链表的操做接口,好比setNextIn用于设置下一个IN类型的Context,setNextOut用来设置下一个OUT类型Context,具体定义以下:
class PipelineContext { public: virtual ~PipelineContext() = default; // 依附到一个pipeline中 virtual void attachPipeline() = 0; // 从pipeline中分离 virtual void detachPipeline() = 0; // 将一个HandlerContext绑定到handler上 template <class H, class HandlerContext> void attachContext(H* handler, HandlerContext* ctx) { // 只有第一次绑定的时候才会设置 if (++handler->attachCount_ == 1) { handler->ctx_ = ctx; } else { // 为什么在此设置的时候就为nullptr handler->ctx_ = nullptr; } } // 设置下一个inbound类型的Context virtual void setNextIn(PipelineContext* ctx) = 0; // 设置下一个outbound类型的Context virtual void setNextOut(PipelineContext* ctx) = 0; // 获取方向(Context方向依赖于Handler方向) virtual HandlerDir getDirection() = 0; };
ContextImplBase主要实现了PipelineContext接口方法,同时它的两个成员:nextIn_和nextOut_就是链表的指针,用来串联起整个Context。
template <class H, class Context> class ContextImplBase : public PipelineContext { public: ~ContextImplBase() = default; // 获取Context绑定的Handler H* getHandler() { return handler_.get(); } // Context初始化,参数为Context所属的Pipeline weak_ptr,Context要绑定的Handler shared_ptr void initialize(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) { pipelineWeak_ = pipeline; pipelineRaw_ = pipeline.lock().get();//裸指针 handler_ = std::move(handler); } // PipelineContext overrides void attachPipeline() override { // 若是该Context尚未被绑定 if (!attached_) { this->attachContext(handler_.get(), impl_);// 将该Context绑定到handler上 handler_->attachPipeline(impl_); // 调用Handler的attachPipeline,有具体的Handler实现 attached_ = true;//标记Context已经attached到一个pipeline中 } } // 从pipeline中分离 void detachPipeline() override { handler_->detachPipeline(impl_);// 调用Handler的detachPipeline,有具体的Handler实现 // 依附标志位为false attached_ = false; } void setNextIn(PipelineContext* ctx) override { if (!ctx) { nextIn_ = nullptr; return; } // 转成InboundLink,由于Context是InboundLink子类 auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx); if (nextIn) { nextIn_ = nextIn; } else { throw std::invalid_argument(folly::sformat( "inbound type mismatch after {}", folly::demangle(typeid(H)))); } } void setNextOut(PipelineContext* ctx) override { if (!ctx) { nextOut_ = nullptr; return; } auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx); if (nextOut) { nextOut_ = nextOut; } else { throw std::invalid_argument(folly::sformat( "outbound type mismatch after {}", folly::demangle(typeid(H)))); } } // 获取Context的方向 HandlerDir getDirection() override { return H::dir; } protected: Context* impl_; // 具体的Context实现 std::weak_ptr<PipelineBase> pipelineWeak_; // PipelineBase* pipelineRaw_; // 该Context绑定的pipeline std::shared_ptr<H> handler_; // 该Context包含的Handler InboundLink<typename H::rout>* nextIn_{nullptr}; // 下一个inbound类型的Context地址 OutboundLink<typename H::wout>* nextOut_{nullptr}; // 下一个outbound类型的Context地址 private: bool attached_{false}; // 这个Context是否已经被绑定 };
ContextImpl就是最终的Context实现,也就是要被添加到Pipeline中(好比使用addBack)的容器(ctxs_,inCtxs_,outCtxs_)的最终Context,在最后的finalize方法中还会进一步将容器中的Context组装成front_和back_单向链表。
ContextImpl的主要功能就是实现了各类事件传递方法(以fire开头的方法),以fireRead为例,这是一个IN类型的事件,因为Context中持有的Pipeline是一个weak类型的指针,所以先尝试lock,保证在事件传播阶段这个Pipeline不会销毁,而后会去调用下一个IN类型的Context的read方法。read方法是InboundLink中定义的接口(注意这里的read不是Handler中的也不是Pipeline中的),ContextImpl的也实现了这个read方法,它的功能很简单,首先仍是先lock住这个Pipeline,而后直接调用Context内部包含的Handler的read方法。
template <class H> class ContextImpl : public HandlerContext<typename H::rout, typename H::wout>, public InboundLink<typename H::rin>, public OutboundLink<typename H::win>, public ContextImplBase<H, HandlerContext<typename H::rout, typename H::wout>> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::BOTH; explicit ContextImpl(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) { this->impl_ = this;//实现就是本身 this->initialize(pipeline, std::move(handler));//初始化 } // For StaticPipeline ContextImpl() { this->impl_ = this; } ~ContextImpl() = default; // HandlerContext overrides // Inbound类型的事件:read事件 void fireRead(Rout msg) override { auto guard = this->pipelineWeak_.lock();// 锁住,确保一旦锁住成功,在操做期间,pipeline不会被销毁 // 若是尚未到最后 if (this->nextIn_) { // 将事件继续向下传播(传给下一个Inbound类型的Context) // 注意:这里调用的是下一个Contex的read而不是fireRead // 即调用下一个Context里面的Handler方法 this->nextIn_->read(std::forward<Rout>(msg)); } else { LOG(WARNING) << "read reached end of pipeline"; } } void fireReadEOF() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readEOF(); } else { LOG(WARNING) << "readEOF reached end of pipeline"; } } void fireReadException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readException(std::move(e)); } else { LOG(WARNING) << "readException reached end of pipeline"; } } void fireTransportActive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportActive(); } } void fireTransportInactive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportInactive(); } } //Outbound类型的事件传播 folly::Future<folly::Unit> fireWrite(Wout msg) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->write(std::forward<Wout>(msg)); } else { LOG(WARNING) << "write reached end of pipeline"; // 若是到了最后,返回一个future return folly::makeFuture(); } } folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->writeException(std::move(e)); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } folly::Future<folly::Unit> fireClose() override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->close(); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } // 获取Context绑定的pipeline指针 PipelineBase* getPipeline() override { return this->pipelineRaw_; } // 获取Context绑定的pipeline引用 std::shared_ptr<PipelineBase> getPipelineShared() override { return this->pipelineWeak_.lock(); } // 设置和获取wirte标志位 void setWriteFlags(folly::WriteFlags flags) override { this->pipelineRaw_->setWriteFlags(flags); } folly::WriteFlags getWriteFlags() override { return this->pipelineRaw_->getWriteFlags(); } // 设置read缓冲区参数 minAvailable、allocationSize void setReadBufferSettings( uint64_t minAvailable, uint64_t allocationSize) override { this->pipelineRaw_->setReadBufferSettings(minAvailable, allocationSize); } std::pair<uint64_t, uint64_t> getReadBufferSettings() override { return this->pipelineRaw_->getReadBufferSettings(); } // InboundLink overrides void read(Rin msg) override { // 保证pipeline不会被删除 auto guard = this->pipelineWeak_.lock(); // 调用该Context绑定的Handler的read方法,至于事件是都须要继续传播,彻底受read中的实现 this->handler_->read(this, std::forward<Rin>(msg)); } void readEOF() override { auto guard = this->pipelineWeak_.lock(); this->handler_->readEOF(this); } void readException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); this->handler_->readException(this, std::move(e)); } void transportActive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportActive(this); } void transportInactive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportInactive(this); } // OutboundLink overrides folly::Future<folly::Unit> write(Win msg) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->write(this, std::forward<Win>(msg)); } folly::Future<folly::Unit> writeException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->writeException(this, std::move(e)); } folly::Future<folly::Unit> close() override { auto guard = this->pipelineWeak_.lock(); return this->handler_->close(this); } };
一样,Context也能够根据传输方向进行细分,首先是InboundContextImpl:
template <class H> class InboundContextImpl : public InboundHandlerContext<typename H::rout>, public InboundLink<typename H::rin>, public ContextImplBase<H, InboundHandlerContext<typename H::rout>> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::IN; explicit InboundContextImpl( std::weak_ptr<PipelineBase> pipeline, std::shared_ptr<H> handler) { this->impl_ = this; this->initialize(pipeline, std::move(handler)); } // For StaticPipeline InboundContextImpl() { this->impl_ = this; } ~InboundContextImpl() = default; // InboundHandlerContext overrides void fireRead(Rout msg) override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->read(std::forward<Rout>(msg)); } else { LOG(WARNING) << "read reached end of pipeline"; } } void fireReadEOF() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readEOF(); } else { LOG(WARNING) << "readEOF reached end of pipeline"; } } void fireReadException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readException(std::move(e)); } else { LOG(WARNING) << "readException reached end of pipeline"; } } void fireTransportActive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportActive(); } } void fireTransportInactive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportInactive(); } } PipelineBase* getPipeline() override { return this->pipelineRaw_; } std::shared_ptr<PipelineBase> getPipelineShared() override { return this->pipelineWeak_.lock(); } // InboundLink overrides void read(Rin msg) override { auto guard = this->pipelineWeak_.lock(); this->handler_->read(this, std::forward<Rin>(msg)); } void readEOF() override { auto guard = this->pipelineWeak_.lock(); this->handler_->readEOF(this); } void readException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); this->handler_->readException(this, std::move(e)); } void transportActive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportActive(this); } void transportInactive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportInactive(this); } };
其次是OutboundContextImpl:
template <class H> class OutboundContextImpl : public OutboundHandlerContext<typename H::wout>, public OutboundLink<typename H::win>, public ContextImplBase<H, OutboundHandlerContext<typename H::wout>> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::OUT; explicit OutboundContextImpl( std::weak_ptr<PipelineBase> pipeline, std::shared_ptr<H> handler) { this->impl_ = this; this->initialize(pipeline, std::move(handler)); } // For StaticPipeline OutboundContextImpl() { this->impl_ = this; } ~OutboundContextImpl() = default; // OutboundHandlerContext overrides folly::Future<folly::Unit> fireWrite(Wout msg) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->write(std::forward<Wout>(msg)); } else { LOG(WARNING) << "write reached end of pipeline"; return folly::makeFuture(); } } folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->writeException(std::move(e)); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } folly::Future<folly::Unit> fireClose() override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->close(); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } PipelineBase* getPipeline() override { return this->pipelineRaw_; } std::shared_ptr<PipelineBase> getPipelineShared() override { return this->pipelineWeak_.lock(); } // OutboundLink overrides folly::Future<folly::Unit> write(Win msg) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->write(this, std::forward<Win>(msg)); } folly::Future<folly::Unit> writeException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->writeException(this, std::move(e)); } folly::Future<folly::Unit> close() override { auto guard = this->pipelineWeak_.lock(); return this->handler_->close(this); } };
按照惯例,仍是来一张图总结一下吧:
Wangle源码分析:EventBaseHandler、AsyncSocketHandler