欢迎你们前往腾讯云社区,获取更多腾讯海量技术实践干货哦~php
做者:LBD git
了解分布式系统的童鞋确定听过Paxos算法的大名。Paxos算法以晦涩难懂著称,其工程实现更难。目前,号称在工程上实现了Paxos算法的应该只有Google、阿里和腾讯。然而,只有腾讯的微信团队真正将代码开源出来,他们将Paxos算法的实现封装成了一个Paxos库,你们能够基于该库实现本身想要的功能,好比用于master选举,或者甚至利用它来实现一个分布式KV数据库等。github
以前就对Paxos很感兴趣,可是一直没看过实现的代码,此次微信开源了PhxPaxos后终于有机会深刻地了解Paxos的实现细节。在这里感谢微信团队。感谢PhxPaxos的做者。让咱们一块儿来领略Paxos的魅力吧。算法
本次的源码分析先从网络部分开始。由于在分布式系统中不可避免会涉及到不一样节点以及相同节点上不一样进程之间的通讯。所以网络部分也是相当重要,因此就先把网络单独拿出来看,接下来再去看Paxos算法的实现部分。数据库
源码的include/phxpaxos目录下是公共头文件。include/phpaxos/network.h 是网络模块的抽象函数,若是用户想使用本身的网络协议,能够经过重写这些函数实现网络模块的自定义。api
咱们先来看下network.h的内容:数组
namespace phxpaxos { //You can use your own network to make paxos communicate. :) class Node; class NetWork { public: NetWork(); virtual ~NetWork() {} //Network must not send/recieve any message before paxoslib called this funtion. virtual void RunNetWork() = 0; //If paxoslib call this function, network need to stop receive any message. virtual void StopNetWork() = 0; virtual int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0; virtual int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0; //When receive a message, call this funtion. //This funtion is async, just enqueue an return. int OnReceiveMessage(const char * pcMessage, const int iMessageLen); private: friend class Node; Node * m_poNode; }; }
这几个函数的做用从名字就能够看出来。并且都是虚函数,即须要重写这些函数。在PhxPaxos中,提供了一个默认的网络模块,就是继承了NetWork类。该类的名字叫DFNetWork,DF应该就是default的缩写了。以下:安全
namespace phxpaxos { class DFNetWork : public NetWork { public: DFNetWork(); virtual ~DFNetWork(); int Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount); void RunNetWork(); void StopNetWork(); int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage); int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage); private: UDPRecv m_oUDPRecv; UDPSend m_oUDPSend; TcpIOThread m_oTcpIOThread; }; }
该类的私有成员里有UDPRecv、UDPSend和TcpIOThread三个类的对象,这三个类分别用于接收UDP消息、发送UDP消息以及收发TCP消息。微信
Init方法就是将UDPRecv、UDPSend和TcpIOThread分别初始化:网络
int DFNetWork :: Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount) { //初始化UDPSend int ret = m_oUDPSend.Init(); if (ret != 0) { return ret; } //初始化UDPRecv ret = m_oUDPRecv.Init(iListenPort); if (ret != 0) { return ret; } //初始化TCP ret = m_oTcpIOThread.Init(sListenIp, iListenPort, iIOThreadCount); if (ret != 0) { PLErr("m_oTcpIOThread Init fail, ret %d", ret); return ret; } return 0; }
具体的初始化过程就是调用socket的api。以UDPRecv为例,就是建立socket、设定端口、设置socket属性(如端口可重用)最后绑定端口。以下:
int UDPRecv :: Init(const int iPort) { //建立socket,得到socket fd if ((m_iSockFD = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(iPort); //设定端口 addr.sin_addr.s_addr = htonl(INADDR_ANY); int enable = 1; //设定socket属性,端口可重用 setsockopt(m_iSockFD, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)); //绑定,用于监听 if (bind(m_iSockFD, (struct sockaddr *)&addr, sizeof(addr)) < 0) { return -1; } return 0; }
RunNetWork就是将UDPRecv、UDPSend和TcpIOThread分别运行起来:
void DFNetWork :: RunNetWork() { //UDPSend和UDPRecv都是调用Thread的start方法 m_oUDPSend.start(); m_oUDPRecv.start(); //TCP的Start是封装过的 m_oTcpIOThread.Start(); }
TcpIOThread的Start()实际执行的代码以下,分别启动了TcpAcceptor、TcpWrite和TcpRead:
void TcpIOThread :: Start() { m_oTcpAcceptor.start(); for (auto & poTcpWrite : m_vecTcpWrite) { poTcpWrite->start(); } for (auto & poTcpRead : m_vecTcpRead) { poTcpRead->start(); } m_bIsStarted = true; }
StopNetWork就是将UDPRecv、UDPSend和TcpIOThread中止。
SendMessageTCP就是将消息用TCP发送:
int DFNetWork :: SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) { return m_oTcpIOThread.AddMessage(iGroupIdx, sIp, iPort, sMessage); }
SendMessageUDP就是将消息用UDP发送:
int DFNetWork :: SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) { return m_oUDPSend.AddMessage(sIp, iPort, sMessage); }
前面SendMessageUDP调用了m_oUDPSend.AddMessage。这里的UDPSend维护了一个发送队列,以下:
Queue<QueueData *> m_oSendQueue;
m_oUDPSend.AddMessage就是将消息加入到UDP的m_oSendQueue中。
而后UDPSend在run方法中一直循环将m_oSendQueue中的消息发送出去:
void UDPSend :: run() { m_bIsStarted = true; while(true) { QueueData * poData = nullptr; //同步,线程安全 m_oSendQueue.lock(); bool bSucc = m_oSendQueue.peek(poData, 1000); if (bSucc) { //取出队头消息 m_oSendQueue.pop(); } m_oSendQueue.unlock(); if (poData != nullptr) { //将消息发送出去 SendMessage(poData->m_sIP, poData->m_iPort, poData->m_sMessage); delete poData; } if (m_bIsEnd) { PLHead("UDPSend [END]"); return; } } }
所以UDPSend就是把消息加入到消息队列,而后循环将消息队列里的消息发送出去。
接下来看看UDPRecv。UDPRecv的初始化前面已经看过了,就是简单的得到socket fd,设定sockaddr_in,设置socket属性最后将socket fd和sockaddr_in绑定用于监听。
主要来看看UDPRecv的run方法。这里主要用了I/O多路复用中的poll,注册了一个pollfd,该pollfd的fd即以前建立的绑定了端口的socket fd,events为POLLIN,表示监听数据可读事件,若是有数据可读了,则调用recvfrom读入数据。最后调用OnReceiveMessage将消息添加到当前instance的IoLoop中:
void UDPRecv :: run() { m_bIsStarted = true; char sBuffer[65536] = {0}; struct sockaddr_in addr; socklen_t addr_len = sizeof(struct sockaddr_in); memset(&addr, 0, sizeof(addr)); while(true) { if (m_bIsEnd) { PLHead("UDPRecv [END]"); return; } struct pollfd fd; int ret; fd.fd = m_iSockFD; //注册POLLIN事件 fd.events = POLLIN; //调用poll检查是否有数据可读 ret = poll(&fd, 1, 500); if (ret == 0 || ret == -1) { continue; } //将接收到的数据放入sBuffer中 int iRecvLen = recvfrom(m_iSockFD, sBuffer, sizeof(sBuffer), 0, (struct sockaddr *)&addr, &addr_len); BP->GetNetworkBP()->UDPReceive(iRecvLen); if (iRecvLen > 0) { //这里会依次调用Node和Instance的OnReceiveMessage方法,最后将消息加入到Instance的IoLoop中 m_poDFNetWork->OnReceiveMessage(sBuffer, iRecvLen); } } }
接下来看看收发TCP消息的TcpIOThread:
class TcpIOThread { public: TcpIOThread(NetWork * poNetWork); ~TcpIOThread(); //用于初始化TcpAcceptor以及iIOThreadCount个m_vecTcpRead和m_vecTcpWrite int Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount); //启动TcpAcceptor用于监听以及全部的m_vecTcpRead和m_vecTcpWrite用于读写消息 void Start(); //中止TcpAcceptor和全部的m_vecTcpRead及m_vecTcpWrite void Stop(); //将消息加入到特定TcpWrite的消息队列中 int AddMessage(const int iGroupIdx, const std::string & sIP, const int iPort, const std::string & sMessage); private: NetWork * m_poNetWork; TcpAcceptor m_oTcpAcceptor; std::vector<TcpRead *> m_vecTcpRead; std::vector<TcpWrite *> m_vecTcpWrite; bool m_bIsStarted; };
TcpRead相似于前面讲的UDPRecv,TcpWrite相似于于UDPSend。严格来说,TcpAcceptor + TcpRead才是UDPRecv。这里把TcpAcceptor单独抽出来,专门用于监听链接请求并创建链接。TcpRead只须要负责读消息就行。
咱们来看看TcpAcceptor:
class TcpAcceptor : public Thread { public: TcpAcceptor(); ~TcpAcceptor(); //监听端口 void Listen(const std::string & sListenIP, const int iListenPort); //一直while循环,监听链接事件并创建链接得到fd,而后添加事件到EventLoop中 void run(); void Stop(); void AddEventLoop(EventLoop * poEventLoop); void AddEvent(int iFD, SocketAddress oAddr); private: //服务端的socket,用于监听 ServerSocket m_oSocket; std::vector<EventLoop *> m_vecEventLoop; private: bool m_bIsEnd; bool m_bIsStarted; };
这里主要来看下run方法:
void TcpAcceptor :: run() { m_bIsStarted = true; PLHead("start accept..."); m_oSocket.setAcceptTimeout(500); m_oSocket.setNonBlocking(true); while (true) { struct pollfd pfd; int ret; pfd.fd = m_oSocket.getSocketHandle(); //注册事件 pfd.events = POLLIN; //等待事件到来 ret = poll(&pfd, 1, 500); if (ret != 0 && ret != -1) { SocketAddress oAddr; int fd = -1; try { //创建链接,得到fd。这里的acceptfd对accept进行了简单的封装 fd = m_oSocket.acceptfd(&oAddr); } catch(...) { fd = -1; } if (fd >= 0) { BP->GetNetworkBP()->TcpAcceptFd(); PLImp("accepted!, fd %d ip %s port %d", fd, oAddr.getHost().c_str(), oAddr.getPort()); //添加事件 AddEvent(fd, oAddr); } } if (m_bIsEnd) { PLHead("TCP.Acceptor [END]"); return; } } }
再看看AddEvent方法:
void TcpAcceptor :: AddEvent(int iFD, SocketAddress oAddr) { EventLoop * poMinActiveEventLoop = nullptr; int iMinActiveEventCount = 1 << 30; for (auto & poEventLoop : m_vecEventLoop) { int iActiveCount = poEventLoop->GetActiveEventCount(); if (iActiveCount < iMinActiveEventCount) { iMinActiveEventCount = iActiveCount; poMinActiveEventLoop = poEventLoop; } } oAddr.getPort()); poMinActiveEventLoop->AddEvent(iFD, oAddr); }
即找到活跃数最少的EventLoop,将事件添加到该EventLoop中。这里应该是为了负载均衡,防止有些线程工做量很大,有些则很空闲。
具体EventLoop的AddEvent就是将事件加入到FDQueue中,以下:
void EventLoop :: AddEvent(int iFD, SocketAddress oAddr) { std::lock_guard<std::mutex> oLockGuard(m_oMutex); m_oFDQueue.push(make_pair(iFD, oAddr)); }
到这里TcpAcceptor的做用及实现基本就很清晰了。
先来看看TcpRead类的定义:
class TcpRead : public Thread { public: TcpRead(NetWork * poNetWork); ~TcpRead(); int Init(); void run(); void Stop(); EventLoop * GetEventLoop(); private: EventLoop m_oEventLoop; };
这里的成员变量是一个EventLoop对象。经过源码发现,Init、run、Stop方法其实都是调用了m_oEventLoop相应的方法,以下:
int TcpRead :: Init() { return m_oEventLoop.Init(20480); } void TcpRead :: run() { m_oEventLoop.StartLoop(); } void TcpRead :: Stop() { m_oEventLoop.Stop(); join(); PLHead("TcpReadThread [END]"); }
所以主要来看下EventLoop。
首先说下Event。PhxPaxos在TCP这块主要用了I/O多路复用中的epoll。这里主要将数据和通知等都封装成Event,而后由TcpWrite和TcpRead的EventLoop去执行。PhxPaxos中的Event包含两个子类,分别是MessageEvent和Notify。其中MessageEvent主要用于数据的读写;而Notify主要用于通知事件发生。这里的Notify基于管道pipe和EPOLLIN事件来实现,能够经过Notify的Init方法看出:
int Notify :: Init() { //m_iPipeFD是一个长度为2的int数组,用于存放管道两端的socket fd int ret = pipe(m_iPipeFD); if (ret != 0) { PLErr("create pipe fail, ret %d", ret); return ret; } fcntl(m_iPipeFD[0], F_SETFL, O_NONBLOCK); fcntl(m_iPipeFD[1], F_SETFL, O_NONBLOCK); AddEvent(EPOLLIN); return 0; }
继续回到EventLoop。首先看下EventLoop的Init方法:
int EventLoop :: Init(const int iEpollLength) { //建立epoll句柄,iEpollLength为监听的fd数 m_iEpollFd = epoll_create(iEpollLength); if (m_iEpollFd == -1) { PLErr("epoll_create fail, ret %d", m_iEpollFd); return -1; } m_poNotify = new Notify(this); assert(m_poNotify != nullptr); //初始化Notify:建立pipe,设置m_iPipeFD并添加EPOLLIN事件 int ret = m_poNotify->Init(); if (ret != 0) { return ret; } return 0; }
接着来看下最重要的StartLoop:
void EventLoop :: StartLoop() { m_bIsEnd = false; while(true) { BP->GetNetworkBP()->TcpEpollLoop(); int iNextTimeout = 1000; DealwithTimeout(iNextTimeout); //PLHead("nexttimeout %d", iNextTimeout); OneLoop(iNextTimeout); CreateEvent(); if (m_poTcpClient != nullptr) { m_poTcpClient->DealWithWrite(); } if (m_bIsEnd) { PLHead("TCP.EventLoop [END]"); break; } } }
主循环是OneLoop:
void EventLoop :: OneLoop(const int iTimeoutMs) { //调用epoll_wait等待事件发生 int n = epoll_wait(m_iEpollFd, m_EpollEvents, MAX_EVENTS, 1); if (n == -1) { if (errno != EINTR) { PLErr("epoll_wait fail, errno %d", errno); return; } } //逐一处理发生的epoll事件 for (int i = 0; i < n; i++) { int iFd = m_EpollEvents[i].data.fd; auto it = m_mapEvent.find(iFd); if (it == end(m_mapEvent)) { continue; } int iEvents = m_EpollEvents[i].events; Event * poEvent = it->second.m_poEvent; int ret = 0; if (iEvents & EPOLLERR) { OnError(iEvents, poEvent); continue; } try { //若是是EPOLLIN事件,代表由数据可读,则调用poEvent的OnRead方法处理 if (iEvents & EPOLLIN) { ret = poEvent->OnRead(); } //若是是EPOLLOUT事件,代表由数据可写,则调用poEvent的OnWrite方法处理 if (iEvents & EPOLLOUT) { ret = poEvent->OnWrite(); } } catch (...) { ret = -1; } if (ret != 0) { OnError(iEvents, poEvent); } } }
其余具体的细节这里就再也不赘述了,有兴趣的能够本身去看看源码。
看完了TcpRead,再来看看TcpWrite。首先仍是看它的定义:
class TcpWrite : public Thread { public: TcpWrite(NetWork * poNetWork); ~TcpWrite(); int Init(); void run(); void Stop(); int AddMessage(const std::string & sIP, const int iPort, const std::string & sMessage); private: TcpClient m_oTcpClient; EventLoop m_oEventLoop; };
Init、run、Stop跟TcpRead中对应方法的做用一致。AddMessage则是调用了m_oTcpClient的AddMessage方法。发现TcpWrite的成员变量比TcpRead多了一个TcpClient对象,所以主要来看看这个TcpClient是干吗的。
刚刚说TcpWrite的AddMessage调用了m_oTcpClient的AddMessage方法。在m_oTcpClient的AddMessage方法中,则是先建立了一个指向MessageEvent对象的指针poEvent,而后再调用poEvent的AddMessage方法:
int TcpClient :: AddMessage(const std::string & sIP, const int iPort, const std::string & sMessage) { //PLImp("ok"); MessageEvent * poEvent = GetEvent(sIP, iPort); if (poEvent == nullptr) { PLErr("no event created for this ip %s port %d", sIP.c_str(), iPort); return -1; } return poEvent->AddMessage(sMessage); }
所以继续看看MessageEvent的AddMessage方法:
int MessageEvent :: AddMessage(const std::string & sMessage) { m_llLastActiveTime = Time::GetSteadyClockMS(); std::unique_lock<std::mutex> oLock(m_oMutex); if ((int)m_oInQueue.size() > TCP_QUEUE_MAXLEN) { BP->GetNetworkBP()->TcpQueueFull(); //PLErr("queue length %d too long, can't enqueue", m_oInQueue.size()); return -2; } if (m_iQueueMemSize > MAX_QUEUE_MEM_SIZE) { //PLErr("queue memsize %d too large, can't enqueue", m_iQueueMemSize); return -2; } QueueData tData; //将消息封装成QueueData后放入队列 tData.llEnqueueAbsTime = Time::GetSteadyClockMS(); tData.psValue = new string(sMessage); m_oInQueue.push(tData); m_iQueueMemSize += sMessage.size(); oLock.unlock(); //退出EpollWait,实际是调用SendNotify发送了一个通知 JumpoutEpollWait(); return 0; }
能够看到这里将消息加上入队时间后封装成一个QueueDate,而后放入m_oInQueue队列中。最后调用EventLoop的SendNotify发送了一个通知(利用以前建立的pipe)退出EpollWait。
说完了消息怎么入队,那消息是怎么发送出去的呢?
这里主要涉及到MessageEvent的OnWrite函数:
int MessageEvent :: OnWrite() { int ret = 0; //只要发送队列不为空或者还有上次未发送完的数据,就调用DoOnWrite执行真正的发送操做 while (!m_oInQueue.empty() || m_iLeftWriteLen > 0) { ret = DoOnWrite(); if (ret != 0 && ret != 1) { return ret; } else if (ret == 1) { //need break, wait next write return 0; } } WriteDone(); return 0; }
DoOnWrite:
int MessageEvent :: DoOnWrite() { //上一次的消息还未发送完毕,将剩下的发送完 if (m_iLeftWriteLen > 0) { return WriteLeft(); } m_oMutex.lock(); if (m_oInQueue.empty()) { m_oMutex.unlock(); return 0; } //从队列中取出一条新消息,准备发送 QueueData tData = m_oInQueue.front(); m_oInQueue.pop(); m_iQueueMemSize -= tData.psValue->size(); m_oMutex.unlock(); std::string * poMessage = tData.psValue; //若是该消息入队过久没有被处理,则抛弃,不发送 uint64_t llNowTime = Time::GetSteadyClockMS(); int iDelayMs = llNowTime > tData.llEnqueueAbsTime ? (int)(llNowTime - tData.llEnqueueAbsTime) : 0; BP->GetNetworkBP()->TcpOutQueue(iDelayMs); if (iDelayMs > TCP_OUTQUEUE_DROP_TIMEMS) { //PLErr("drop request because enqueue timeout, nowtime %lu unqueuetime %lu", //llNowTime, tData.llEnqueueAbsTime); delete poMessage; return 0; } //计算发送缓冲区长度,须要加上4字节用于表示消息长度 int iBuffLen = poMessage->size(); int niBuffLen = htonl(iBuffLen + 4); int iLen = iBuffLen + 4; //申请缓冲区 m_oWriteCacheBuffer.Ready(iLen); //将消息长度及消息内容拷贝到缓冲区 memcpy(m_oWriteCacheBuffer.GetPtr(), &niBuffLen, 4); memcpy(m_oWriteCacheBuffer.GetPtr() + 4, poMessage->c_str(), iBuffLen); m_iLeftWriteLen = iLen; m_iLastWritePos = 0; delete poMessage; //PLImp("write len %d ip %s port %d", iLen, m_oAddr.getHost().c_str(), m_oAddr.getPort()); //开始发送消息,有可能消息太大一次发送不完 int iWriteLen = m_oSocket.send(m_oWriteCacheBuffer.GetPtr(), iLen); if (iWriteLen < 0) { PLErr("fail, write len %d ip %s port %d", iWriteLen, m_oAddr.getHost().c_str(), m_oAddr.getPort()); return -1; } //须要下次再发送 if (iWriteLen == 0) { //need wait next write AddEvent(EPOLLOUT); return 1; } //PLImp("real write len %d", iWriteLen); //发送成功 if (iWriteLen == iLen) { m_iLeftWriteLen = 0; m_iLastWritePos = 0; //write done } //没有一次性所有发送完,剩下的须要下次发送 else if (iWriteLen < iLen) { //m_iLastWritePos和m_iLeftWriteLen分别用来表示上次写的位置以及剩下须要发送的长度 m_iLastWritePos = iWriteLen; m_iLeftWriteLen = iLen - iWriteLen; PLImp("write buflen %d smaller than expectlen %d", iWriteLen, iLen); } else { PLErr("write buflen %d large than expectlen %d", iWriteLen, iLen); } return 0; }
先介绍这么多吧,接下去会有更多相关的文章,特别是PhxPaxos中实现Paxos算法的那部分,相信看过Paxos相关论文的童鞋会对这块很感兴趣。
最后,附上PhxPaxos源码的地址:https://github.com/Tencent/phxpaxos
可进入个人博客查看原文
欢迎关注公众号: FullStackPlan 获取更多干货
基于腾讯开源 Angel 的 LDA* 入选国际顶级学术会议 VLDB
此文已由做者受权腾讯云技术社区发布,转载请注明原文出处
原文连接:https://cloud.tencent.com/community/article/363266