使用CBrother作TCP服务器与C++客户端通讯

使用CBrother脚本作TCP服务器与C++客户端通讯python

  工做中老是会遇到一些对于服务器压力不是特别大,可是代码量比较多,用C++写起来很不方便。对于这种需求,我选择用CBrother脚本作服务器,之因此不选择Python是由于python的语法我实在是适应不了,再来CBrother的网络框架也是用C++封装的异步IO,性能仍是颇有保证的。c++

  废话很少说,先来看下服务器代码,我这里只是记录一个例子,不是所有代码,方便后面作项目的时候直接来本身博客复制代码修改。数据库

  

 1 import CBSocket.code    //加载Socket扩展
 2 
 3 var g_tcpModule = null;    //全局保存TCP模块对象
 4 
 5 const MSG_TYPE_USER = 1;    //客户端发来的消息
 6 const MSG_TYPE_CLOSE = 2;    //socket断线的消息
 7 
 8 const LOGIC_MSG_LOGIN = 1;        //登录消息全局定义
 9 const LOGIC_MSG_GETID = 2;        //获取ID消息全局定义
10 
11 function main(a)    //入口函数,cbrother从这里开始执行
12 { 13     var thread = new Thread();        //启动一个数据管理线程,串行处理全局数据,能够根据不一样业务启动多个
14     thread.setThreadAction(new ThreadAction()); 15  thread.start(); 16     
17     g_tcpModule = new TcpModule();    //启动一个TCP模块
18     var tcpAction = new TcpAction(); 19     tcpAction.thread = thread; 20     g_tcpModule.setTcpAction(tcpAction);    //设置TCP的处理类为TcpAction
21     g_tcpModule.addListenPort(6061,"0.0.0.0");    //监听6061端口
22  g_tcpModule.start(); 23     
24     print "tcpServer start!"; 25     
26     while(1) 27  { 28         Sleep(1000); 29  } 30 }

 

  TcpAction主要处理tcpmodule的消息回调数组

 1 class SocketBuf    //这个类会给每一个socket建立一个,用来拼包,由于tcp在传输过程当中并不保证每次收到都是整包数据
 2 {  3     var _byteArray = new ByteArray(1024 * 10);    //每一个socket预留10K的缓冲
 4     
 5     function SocketBuf()  6  {  7         _byteArray.setLittleEndian(true);        //c++编码为低位编址(LE) 若是要高位编码c++里面能够htonl
 8  }  9     
 10     function PushData(bytes,len)  11  {  12         print "pushdata " + len;  13         if(!_byteArray.writeBytes(bytes,len))  14  {  15             print "socket buf is full!";  16             return false;  17  }  18         
 19         return true;  20  }  21     
 22     function CheckPack()  23  {  24         print "begin CheckPack!";  25         var writePos = _byteArray.getWritePos();  26         print "checkpack " + writePos;  27         if(writePos < 4)    //前4个字节表示包的长度
 28  {  29             print "CheckPack null < 4";  30             return null;  31  }  32         
 33         var msglen = _byteArray.readInt();  34         print "checkpack " + msglen;  35         if(writePos < msglen + 4)    //缓存里的数据还不够一包数据,继续等待数据
 36  {  37             print "CheckPack null writePos < msglen + 4";  38             _byteArray.setReadPos(0);  39             return null;  40  }  41         
 42         //够一包数据了,取出数据,到另外一个线程里去处理
 43         var newBytes = _byteArray.readBytes(msglen);  44         newBytes.setLittleEndian(true);  45         
 46         var readPos = _byteArray.getReadPos();  47         
 48         _byteArray.copy(_byteArray,0,readPos,writePos - readPos);  49         _byteArray.setReadPos(0);  50         _byteArray.setWritePos(writePos - readPos);  51         print "writePos:" + writePos;  52         print "readPos:" + readPos;  53         //XORCode(newBytes); //异或解密一下,这个为了安全性考虑。我在另外一篇博客里专门写这个函数与C++加密函数的对应关系
 54         return newBytes;  55  }  56 }  57 
 58 class TcpAction  59 {  60     var thread;    //这个是逻辑线程,这个例子里我只启动一个逻辑线程
 61     var sockMap = new Map();    //保存每一个socket的消息缓冲,拼包用
 62     var lock = new Lock();        //sockMap的锁
 63     
 64     function OnAccept(sock)  65  {  66         print "accept " + sock + " " + g_tcpModule.getRemoteIP(sock);  67         
 68         //监听到客户端链接,管理起来
 69  lock.lock();  70         var socketbuf = new SocketBuf();  71  sockMap.add(sock,socketbuf);  72  lock.unlock();  73  }  74     
 75     function OnClose(sock)  76  {  77         print "onclose " + sock;  78         
 79         //断线了,移除掉,并通知逻辑线程
 80  lock.lock();  81  sockMap.remove(sock);  82  lock.unlock();  83         
 84         var newmsg = new ThreadMsg(sock,null);  85         newmsg.type = MSG_TYPE_CLOSE;  86  thread.addMsg(newmsg);  87  }  88     
 89     function OnRecv(sock,byteArray,len)  90  {  91         print "onrecv " + sock + " len:" + len;  92         
 93         //收到数据获取socket缓冲
 94  lock.lock();  95         var socketbuf = sockMap.get(sock);  96  lock.unlock();  97         
 98         if(socketbuf == null)  99  { 100             return;    //应该是被关掉了
101  } 102         
103         if(!socketbuf.PushData(byteArray,len))    //数据压进去
104  { 105  g_tcpModule.closeSocket(sock); 106             return;//buf满了都解不开包,说明数据有问题,关了它
107  } 108         
109         //把包解出来丢到逻辑线程去处理,循环是由于怕buf里同时又好几包数据
110         var newBytes = socketbuf.CheckPack(); 111         while(newBytes != null) 112  { 113             thread.addMsg(new ThreadMsg(sock,newBytes)); 114             newBytes = socketbuf.CheckPack(); 115  } 116  } 117 }

  最后是逻辑线程里,其实上面的代码写好了基本上就不动了,后续添加消息号增长处理都是在逻辑线程里去,因此上面的代码能够封装一下,我这里是为了本身好记忆,因此就这样写了。缓存

 1 //这个类是线程消息类,能够理解为一个结构体
 2 class ThreadMsg  3 {  4     function ThreadMsg(id,bytes)  5  {  6         socketid = id;  7         byteArray = bytes;  8         type = MSG_TYPE_USER;  9  }  10     
 11     var socketid;  12     var type;  13     var byteArray;  14 }  15 
 16 class ThreadAction  17 {  18     var _userMap = new Map();        //用户名索引用户信息
 19     var _socketMsp = new Map();        //Socket索引用户信息
 20     
 21     var _funcMap = new Map();        //消息对应的处理函数
 22     
 23     function onInit()  24  {  25         print "thread init" ;  26         
 27         //线程启动时读取数据库数据 由于篇幅问题不写加载数据库的部分了,固定插入两条数据作模拟数据
 28         //LoadDB();
 29         _userMap.add("zhangsan",new UserData("zhangsan","123123",1));  30         _userMap.add("lisi",new UserData("lisi","321321",2));  31         
 32         _funcMap.add(LOGIC_MSG_LOGIN,onLogin);        //注册消息号的处理函数
 33  _funcMap.add(LOGIC_MSG_GETID,onGetID);  34  }  35     
 36     function onMsg(msg)  37  {  38         switch(msg.type)  39  {  40             case MSG_TYPE_CLOSE:  41  {  42                 //断线消息
 43                 print "MSG_TYPE_CLOSE";  44  OnClose(msg.socketid);  45                 break;  46  }  47             case MSG_TYPE_USER:  48  {  49                 //客户端发来的消息,客户端发来的数据结构都是从struct MsgBase派生,因此前4个字节就是struct MsgBase的msgid
 50                 var msgid = msg.byteArray.readInt();  51                 var func = _funcMap.get(msgid);  52                 print "MSG_TYPE_USER" + msgid;  53                 if(func != null)  54  {  55  func.invoke(msg.socketid,msg.byteArray);  56  }  57                 break;  58  }  59  }  60  }  61     
 62     function onEnd()  63  {  64         print "thread end";  65  }  66     
 67     function SendData(socketid,byteArray)    //发送数据给客户端的函数
 68  {  69         //XORCode(byteArray); //异或加密一下
 70         var lenByte = new ByteArray();  71         lenByte.setLittleEndian(true);  72  lenByte.writeInt(byteArray.getWritePos());  73         
 74  g_tcpModule.sendData(socketid,lenByte);  75  g_tcpModule.sendData(socketid,byteArray);  76  }  77     
 78     function OnClose(socketid)  79  {  80         //关闭的时候从socket管理里移除,清理在线状态
 81         var userdata = _socketMsp.get(socketid);  82         if(userdata == null)  83  {  84             return;  85  }  86         userdata._IsOnLine = false;  87         userdata._SocketID = 0;  88  _socketMsp.remove(socketid);  89  }  90     
 91     function onLogin(socketid,byteArray)  92  {  93         print "onLogin";  94         
 95         var namebuf = byteArray.readBytes(32);        //这个长度要跟C++结构体对应 char name[32];
 96         var name = namebuf.readString();  97         var pwdbuf = byteArray.readBytes(32);        //这个长度要跟C++结构体对应 char pwd[32];
 98         var pwd = pwdbuf.readString();  99         
100  print name; 101  print pwd; 102         
103         var userdata = _userMap.get(name); 104         if(userdata == null) 105  { 106             //没有找到用户名,用户名错误
107             var resBytes = new ByteArray(); 108  resBytes.writeInt(LOGIC_MSG_LOGIN); 109             resBytes.writeInt(1);                //回复1表示账号或者密码错误错误
110  SendData(socketid,resBytes); 111             print "name err!"; 112             return; 113  } 114         
115         if(pwd != userdata._UserPwd) 116  { 117             var resBytes = new ByteArray(); 118  resBytes.writeInt(LOGIC_MSG_LOGIN); 119             resBytes.writeInt(1);                //回复1表示账号或者密码错误错误 
120  SendData(socketid,resBytes); 121             print "pwd err!"; 122             return; 123  } 124         
125         if(userdata._IsOnLine)    //这个账号已经登陆过了,冲掉
126  { 127  g_tcpModule.closeSocket(userdata._SocketID); 128  OnClose(userdata._SocketID); 129  } 130         
131         //登录成功,添加进socket管理,并至在线状态
132         userdata._IsOnLine = true; 133         userdata._SocketID = socketid; 134  _socketMsp.add(socketid,userdata); 135         
136         var resBytes = new ByteArray(); 137         resBytes.setLittleEndian(true); 138  resBytes.writeInt(LOGIC_MSG_LOGIN); 139         resBytes.writeInt(2);                //回复2表示登陆成功
140  SendData(socketid,resBytes); 141         print "login suc!"; 142  } 143     
144     function onGetID(socketid,byteArray) 145  { 146         var userdata = _socketMsp.get(socketid); 147         if(userdata == null) 148  { 149             return;        //该socket不在线,不处理
150  } 151         
152         var resBytes = new ByteArray(); 153         resBytes.setLittleEndian(true); 154  resBytes.writeInt(LOGIC_MSG_GETID); 155  resBytes.writeInt(userdata._UserID); 156  SendData(socketid,resBytes); 157  } 158 }

  服务器代码完成了,再来看看C++客户端代码。安全

 1 enum
 2 {  3     LOGIC_MSG_LOGIN = 1,        //登录
 4     LOGIC_MSG_GETID = 2,        //获取ID
 5 };  6 
 7 struct MsgBase    //消息基类
 8 {  9     int msgid;  10 };  11 
 12 struct MsgLogin : public MsgBase    //登录消息
 13 {  14     char name[32];  15     char pwd[32];  16 };  17 
 18 struct MsgLoginRet : public MsgBase     //登录返回
 19 {  20     int res;  21 };  22 
 23 struct MsgGetID : public MsgBase     //获取ID消息
 24 {  25 };  26 
 27 struct MsgGetIDRet : public MsgBase     //获取ID返回
 28 {  29     int userid;  30 };  31 
 32 //接收服务器消息,将数据放到recvBuf里
 33 bool RecvData(int sock,char* recvBuf)  34 {  35     int alllen = 0;  36     int len = recv(sock,recvBuf,4,0);    //先读4个字节为消息长度
 37     if (len <= 0)  38  {  39         return false;    //socket被关闭了
 40  }  41 
 42     alllen += len;  43     while (alllen < 4)  44  {  45         len = recv(sock,recvBuf + len,4 - len,0);  46         if (len <= 0)  47  {  48             return false;    //socket被关闭了
 49  }  50 
 51         alllen += len;  52  }  53 
 54     int msglen = *((int*)recvBuf);  55 
 56     //再将消息内容读入recvBuf
 57     alllen = 0;  58     len = recv(sock,recvBuf,msglen,0);  59     if (len <= 0)  60  {  61         return false; //socket被关闭了
 62  }  63 
 64     alllen += len;  65     while (alllen < msglen)  66  {  67         len = recv(sock,recvBuf + len,msglen - len,0);  68         if (len <= 0)  69  {  70             return false; //socket被关闭了
 71  }  72 
 73         alllen += len;  74  }  75 
 76     return true;  77 }  78 
 79 //发送数据
 80 bool SendData(int sock,MsgBase* pbase,int len)  81 {  82     //XORBuf((char*)pbase,sizeof(len)); //异或加密,下一篇博客专门写这个函数
 83 
 84     send(sock,(const char*)&len,4,0);        //发送长度
 85     send(sock,(const char*)pbase,len,0);            //发送数据
 86     return true;  87 }  88 
 89 int _tmain(int argc, _TCHAR* argv[])  90 {  91     WORD sockVersion = MAKEWORD(2, 2);  92  WSADATA wsaData;  93     if (WSAStartup(sockVersion, &wsaData) != 0)  94  {  95         return 0;  96  }  97 
 98     int clinetsocket = socket(PF_INET, SOCK_STREAM, 0);  99     if (clinetsocket == -1) 100  { 101         return 0; 102  } 103 
104     struct hostent *hptr = gethostbyname("127.0.0.1"); 105 
106     struct sockaddr_in address; 107     address.sin_family = AF_INET; 108     address.sin_addr.s_addr = *(u_long*)hptr->h_addr_list[0]; 109     address.sin_port = htons(6061); 110 
111     int result = connect(clinetsocket, (struct sockaddr *)&address, sizeof(address)); 112     if (result == -1) 113  { 114         return NULL; 115  } 116 
117     //定义登录消息结构
118  MsgLogin msg; 119     msg.msgid = LOGIC_MSG_LOGIN; 120     strcpy(msg.name,"lisi"); 121     strcpy(msg.pwd,"321321"); 122     int len = sizeof(msg); 123     SendData(clinetsocket,&msg,len); 124 
125     char recvBuf[1024] = {0}; 126 
127     while (true) 128  { 129         if(!RecvData(clinetsocket,recvBuf)) 130  { 131             printf("socket close!\n");    //链接断了
132             break; 133  } 134 
135         //收到的数据先转为pBase 看前4个字节的msgid
136         MsgBase* pBase = (MsgBase*)recvBuf; 137 
138         switch (pBase->msgid) 139  { 140         case LOGIC_MSG_LOGIN: 141  { 142                 //登录返回
143                 MsgLoginRet* mlr = (MsgLoginRet*)pBase; 144                 if (mlr->res == 1) 145  { 146                     printf("login err!\n"); 147  } 148                 else
149  { 150                     printf("login suc!\n"); 151 
152                     //请求ID
153  MsgGetID msggetid; 154                     msggetid.msgid = LOGIC_MSG_GETID; 155                     len = sizeof(msggetid); 156                     SendData(clinetsocket,&msggetid,len); 157  } 158                 break; 159  } 160         case LOGIC_MSG_GETID: 161  { 162                 //请求ID返回
163                 MsgGetIDRet* mgir = (MsgGetIDRet*)pBase; 164                 printf("userid : %d\n",mgir->userid); 165                 break; 166  } 167  } 168  } 169 
170     return 0; 171 }

  这样客户端和服务器就都完成了,下面再来记录一下C++消息结构序列化后的二进制流。服务器

  MsgBase为全部消息的基类,因此从它派生的结构体前4个字节确定是整形的msgid。在服务端直接readInt读取前4个字节就表示读取了MsgBase里的msgid。网络

    MsgLogin有两个成员变量都为char[32]数组,因此这个结构体的总字节大小是64,除掉前4个字节是msgid意外,readBytes(32)表示读取这个数组,再readString表示获取\0结尾的字符串数据结构

  MsgLoginRet只有一个成员变量,因此服务器第一个writeInt表示填充基类的msgid,第二个writeInt表示res。框架

  

  以后的逻辑就都是添加消息号和消息结构作逻辑了,用脚本作服务器编码效率仍是很是高的。

相关文章
相关标签/搜索