负载均衡通信转发分发器(G5)源代码分析

负载均衡通信转发分发器(G5)源代码分析 (以版本v1.1.0为准)     G5源代码文件只有.c(2400行)和.h(260行)两个源文件,行数虽然很少,可是技术密集度较高,分析源码主要从基于epoll(ET)事件处理应用层框架和转发会话结构管理两方面入手。     先来看数据结构。主要的数据结构有服务器环境大结构,里面包含了全部参数配置和运行时状态数据,是全部内部函数传递的第一个参数。 [code=c] /* 服务器环境大结构 */ struct ServerEnv {     struct CommandParam        cmd_para ;          struct ForwardRule        *forward_rule ;     unsigned long            forward_rule_count ;          int                event_env ;     struct ForwardSession        *forward_session ;     unsigned long            forward_session_maxcount ;     unsigned long            forward_session_count ;     unsigned long            forward_session_use_offsetpos ;          struct ServerCache        server_cache ; } ; [/code]     包含的众多成员中最应关注的是转发规则结构和转发会话结构 [code=c] /* 转发规则结构 */ struct ForwardRule {     char                rule_id[ RULE_ID_MAXLEN + 1 ] ;     char                rule_mode[ RULE_MODE_MAXLEN + 1 ] ;          unsigned long            timeout ;          struct ClientNetAddress        client_addr[ RULE_CLIENT_MAXCOUNT ] ;     unsigned long            client_count ;          struct ForwardNetAddress    forward_addr[ RULE_FORWARD_MAXCOUNT ] ;     unsigned long            forward_count ;          struct ServerNetAddress        server_addr[ RULE_SERVER_MAXCOUNT ] ;     unsigned long            server_count ;     unsigned long            select_index ;          union     {         struct         {             unsigned long    server_unable ;         } RR[ RULE_SERVER_MAXCOUNT ] ;         struct         {             unsigned long    server_unable ;         } LC[ RULE_SERVER_MAXCOUNT ] ;         struct         {             unsigned long    server_unable ;             struct timeval    tv1 ;             struct timeval    tv2 ;             struct timeval    dtv ;         } RT[ RULE_SERVER_MAXCOUNT ] ;     } status ; } ; [/code]     转发规则结构里的数据主要来源于配置文件,还有一些是由配置处理获得。     最后一块数据status则是用于负载均衡的内部状态跟踪。 [code=c] /* 转发会话结构 */ struct ForwardSession {     char                forward_session_type ;          struct ClientNetAddress        client_addr ;     struct ListenNetAddress        listen_addr ;     struct ServerNetAddress        server_addr ;     unsigned long            client_session_index ;     unsigned long            server_session_index ;          struct ForwardRule        *p_forward_rule ;     unsigned long            client_index ;          char                connect_status ;     unsigned long            try_connect_count ;          unsigned long            active_timestamp ;          char                io_buffer[ IO_BUFSIZE + 1 ] ;     long                io_buflen ; } ; [/code]     转发会话结构在转发链接创建时建立,链接断开时销毁。     一个数据传输方向对应一个会话,通常一个链接请求对应两段TCP链接也对应两个会话结构。成员forward_session_type表示是客户端到服务端数据传输方向仍是服务端到客户端数据传输方向。获得一个会话结构能够经过成员client_session_index和server_session_index获得另外一个会话结构。     成员p_forward_rule指向该会话使用的转发规则结构。     成员client_addr、listen_addr和分别对应客户端、转发端(本地侦听端)和服务端信息。     成员connect_status用于跟踪异步链接模式下的创建状态。     成员io_buffer用于会话数据传输时的缓冲区,便于异步处理。          了解了几个主要的数据结构后咱们开始分析源代码。          main函数主要解析命令行参数和显示语法,调用G5函数          G5函数主要是建立epoll池、装载配置文件、调用服务器主工做循环(ServerLoop函数)、销毁epoll池     pse->event_env = epoll_create( pse->forward_session_maxcount ) ;     nret = LoadConfig( pse ) ;     nret = ServerLoop( pse ) ;     close( pse->event_env );          ServerLoop函数负责服务器主循环,即批量等待epoll事件,有epoll事件发生时,处理之     sock_count = epoll_wait( pse->event_env , events , WAIT_EVENTS_COUNT , 1000 ) ;         若是是侦听端口事件         if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_LISTEN )             管理端口事件,调用函数AcceptManageSocket             if( strcmp( p_forward_session->listen_addr.rule_mode , FORWARD_RULE_MODE_G ) == 0 )                 nret = AcceptManageSocket( pse , p_event , p_forward_session ) ;             转发端口事件,调用函数AcceptForwardSocket             else                 nret = AcceptForwardSocket( pse , p_event , p_forward_session ) ;         若是是输入事件         else if( p_event->events & EPOLLIN )             管理端口输入事件,调用函数ReceiveOrProcessManageData             if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_MANAGE )                 nret = ReceiveOrProcessManageData( pse , p_event , p_forward_session ) ;             转发端口输入事件,调用函数TransferSocketData             else if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_CLIENT || p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_SERVER )                 nret = TransferSocketData( pse , p_event , p_forward_session ) ;         若是是输出事件         else if( p_event->events & EPOLLOUT )             异步链接创建响应事件,调用函数SetSocketConnected             if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_SERVER && p_forward_session->connect_status == CONNECT_STATUS_CONNECTING )                 nret = SetSocketConnected( pse , p_event , p_forward_session ) ;             异步发送sock可写事件,调用函数ContinueToWriteSocketData             else if( p_forward_session->connect_status == CONNECT_STATUS_CONNECTED )                 nret = ContinueToWriteSocketData( pse , p_event , p_forward_session ) ;         若是是错误事件         else if( p_event->events & EPOLLERR )             调用函数ResolveSocketError             nret = ResolveSocketError( pse , p_event , p_forward_session ) ;          函数AcceptForwardSocket用于在有客户端接入时,查询转发规则,转连到对应服务端上         循环接受转发端口链接,epoll(ET)边缘触发时必须把一口气把全部事件都处理掉,包含客户端接入事件             接受客户端接入             client_addr.sock = accept( p_forward_session->listen_addr.sock , (struct sockaddr *) & (client_addr.netaddr.sockaddr) , & addr_len ) ;             查询转发规则             nret = MatchForwardRule( pse , & client_addr , & (p_forward_session->listen_addr) , & p_forward_rule , & client_index ) ;             链接服务端(函数ConnectToRemote)             nret = ConnectToRemote( pse , p_event , p_forward_session , p_forward_rule , client_index , & client_addr , TRY_CONNECT_MAXCOUNT ) ;          函数ConnectToRemote用于有客户端接入时转连服务端         建立转连的本地客户端sock         server_addr.sock = socket( AF_INET , SOCK_STREAM , IPPROTO_TCP );         设置非堵塞模式         SetNonBlocking( server_addr.sock );         根据转发规则,选择目标网络地址(函数SelectServerAddress)。若是有多个服务端地址的话按负载均衡算法选择         nret = SelectServerAddress( pse , p_client_addr , p_forward_rule , server_addr.netaddr.ip , server_addr.netaddr.port ) ;         链接目标网络地址         nret = connect( server_addr.sock , ( struct sockaddr *) & (server_addr.netaddr.sockaddr) , addr_len );         若是链接创建中         if( nret < 0 ) if( errno != EINPROGRESS )             登记服务端转发会话到会话池中(会话链接状态为正在链接中),登记sock到epoll池             epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_server->server_addr.sock , & server_event );         若是链接创建完成(本地链接本身时大几率发生)             登记客户端转发会话到会话池中(会话链接状态为链接成功),登记sock到epoll池             epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_client->client_addr.sock , & client_event );             登记服务端转发会话到会话池中(会话链接状态为链接成功),登记sock到epoll池             epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_server->server_addr.sock , & server_event );          函数SetSocketConnected用于以前正在链接服务端的会话当链接成功事件发生时处理         登记客户端转发会话到会话池中(会话链接状态为链接成功),登记sock到epoll池         p_forward_session_client->connect_status = CONNECT_STATUS_CONNECTED ;         epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_client->client_addr.sock , & client_event );         更新服务端转发会话链接状态链接成功         p_forward_session_server->connect_status = CONNECT_STATUS_CONNECTED ;         epoll_ctl( pse->event_env , EPOLL_CTL_MOD , p_forward_session_server->server_addr.sock , & server_event );         (等链接创建后,一对会话进入等待数据输入事件EPOLLIN)          函数TransferSocketData用于从一个准备好数据输入的会话中接收数据,并转发给服务端         循环从客户端sock中接收数据,epoll(ET)边缘触发时必须把一口气把全部事件都处理掉             接收通信数据             p_out_forward_session->io_buflen = recv( in_sock , p_out_forward_session->io_buffer , IO_BUFSIZE , 0 ) ;             若是没有接收数据了,跳出循环             if( p_out_forward_session->io_buflen < 0 ) if( errno == EAGAIN )             若是接收失败,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );             SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );             若是接收到输入端断开链接事件,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );             SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );             循环发送数据到服务端sock                 发送通信数据                 len = send( out_sock , p_out_forward_session->io_buffer , p_out_forward_session->io_buflen , 0 ) ;                 若是底层发送缓冲区满了,则暂停客户端sock输入事件EPOLLIN监控,启用服务端sock输出事件EPOLLOUT监控,跳出循环发送                 if( len < 0 ) if( errno == EAGAIN )                     epoll_ctl( pse->event_env , EPOLL_CTL_MOD , in_sock , & in_event );                     epoll_ctl( pse->event_env , EPOLL_CTL_MOD , out_sock , & out_event );                 若是发送出错,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话                 epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );                 epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );                 SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );                 若是发送完了,跳出循环                 else if( len == p_out_forward_session->io_buflen )                     break;                 不然继续发送未发送数据                 else                     p_out_forward_session->io_buflen -= len ;                     memmove( p_out_forward_session->io_buffer , p_out_forward_session->io_buffer + len , p_out_forward_session->io_buflen );          函数ContinueToWriteSocketData用于处理以前底层发送缓冲区满了引起的异步发送机制         循环继续发送未发送数据到服务端sock             发送通信数据             len = send( out_sock , p_out_forward_session->io_buffer , p_out_forward_session->io_buflen , 0 ) ;             若是底层发送缓冲区满了,跳出循环发送             if( len < 0 ) if( errno == EAGAIN )                 break;             若是发送出错,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话                 epoll_ctl( pse->event_env , EPOLL_CTL_DEL , in_sock , NULL );                 epoll_ctl( pse->event_env , EPOLL_CTL_DEL , out_sock , NULL );                 SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );             若是发送完了,恢复客户端sock输入事件EPOLLIN监控,暂停服务端sock输出事件EPOLLOUT监控,跳出循环             else if( len == p_out_forward_session->io_buflen )                 epoll_ctl( pse->event_env , EPOLL_CTL_MOD , in_sock , & in_event );                 epoll_ctl( pse->event_env , EPOLL_CTL_MOD , out_sock , & out_event );                 break;             不然继续发送             else                 p_out_forward_session->io_buflen -= len ;                 memmove( p_out_forward_session->io_buffer , p_out_forward_session->io_buffer + len , p_out_forward_session->io_buflen );          函数ResolveSocketError用于全部sock出错时,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话         若是是异步链接事件         if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_SERVER && p_forward_session->connect_status == CONNECT_STATUS_CONNECTING )             设置以前选择目标服务器不可用状态             nret = OnServerUnable( pse , p_forward_session->p_forward_rule ) ;             选择新目标服务器,链接之             nret = ConnectToRemote( pse , p_event , p_forward_session , p_forward_session->p_forward_rule , p_forward_session->client_index , & (p_forward_session->client_addr) , --p_forward_session->try_connect_count ) ;             清理以前临时建立的服务端会话结构             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );             SetForwardSessionUnitUnused( p_forward_session );         若是是数据交换报错事件             关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );             SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );          epoll(ET)事件处理应用层框架源代码主线分析完成,但愿经过分析其源代码,能够帮助读者理解其设计思路和代码结构,审核做者编码缺陷,提升使用准确性,也有助于读者在许可证容许条件下fork本身的版本。     若有意见或建议欢迎及时联系我     开源项目首页 : http://git.oschina.net/calvinwilliams/G5     做者邮箱 : calvinwilliams.c@gmail.com
相关文章
相关标签/搜索