1. 基础socket库linux
socket.h:ios
/** * 网络套接字库 */ #ifndef Socket_h #define Socket_h #include <stdio.h> #include <string> #ifdef WIN32 // windows #include <winsock.h> typedef int socklen_t; #else // linux, MacOS #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> #include <fcntl.h> #include <unistd.h> #include <sys/stat.h> #include <sys/types.h> #include <arpa/inet.h> #include <errno.h> #define INVALID_SOCKET -1 #define SOCKET_ERROR -1 typedef int SOCKET; #endif #define SOCKET_BACKLOG 100 namespace avalon { int socket_error(); int socket_init(); int socket_clean(); void socket_debug(const char* message, ...); class Socket { public: static Socket* create(SOCKET socket_fd); static Socket* create(int family, int type, int protocal = IPPROTO_IP); public: Socket(SOCKET socket_fd); Socket(int family, int type, int protocal = IPPROTO_IP); Socket& operator = (SOCKET socket_fd); virtual ~Socket(); bool connect(const char* host, unsigned short port); bool bind(unsigned short port); bool listen(int backlog = SOCKET_BACKLOG); Socket* accept(char* client_host = nullptr); ssize_t send(const char* buffer, size_t len, int flag = 0); ssize_t recv(char* buffer, size_t len, int flag = 0); int close(); SOCKET getSocketFD(); void set_blocking(const bool blocking); private: SOCKET _socket_fd; int _family; int _type; int _protocal; }; } #endif
socket.cppwindows
/** * 网络套接字库 */ #include "AvalonSocket.h" #ifdef WIN32 #pragma comment(lib, "wsock32") #endif #define SOCKET_DEBUG_LEVEL 0 namespace avalon { int socket_error() { int error = 0; #ifdef WIN32 error = WSAGetLastError(); #else error = errno; #endif printf("Avalon socket error: %d %s \n", error, strerror(error)); return error; } void socket_debug(const char* message, ...) { char buf[1024] = ""; va_list args; va_start(args, message); vsnprintf(buf, 1024, message, args); va_end(args); std::string error = "Avalon sokcet: "; error.append(buf); error.append("\n"); printf(error.c_str()); if (SOCKET_DEBUG_LEVEL) { int error_no = socket_error(); if (error_no != -1) { throw error_no; } } } int socket_init() { #ifdef WIN32 WSADATA wsadata; WORD version = MAKEWORD(2, 0); int ret = WSAStartup(version,&wsadata); if (ret) { socket_debug("Initilize winsock error"); return -1; } #endif return 0; } int socket_clean() { #ifdef WIN32 return WSACleanup(); #endif return 0; } Socket* Socket::create(SOCKET socket_fd) { if (socket_fd < 0) { socket_debug("socket_fd(%d) is invailed.", socket_fd); return nullptr; } else { Socket* socket = new Socket(socket_fd); if (socket) { return socket; } else { socket_debug("Create avalon socket failed."); return nullptr; } } } Socket* Socket::create(int family, int type, int protocal) { Socket* socket = new Socket(family, type, protocal); if (socket) { if (socket->getSocketFD() == INVALID_SOCKET) { delete socket; socket_debug("Create socket failed."); return nullptr; } socket_debug("Create socket(%d) successfully.", socket->getSocketFD()); return socket; } else { socket_debug("Create avalon socket failed."); return nullptr; } } Socket::Socket(SOCKET socket_fd) : _family(AF_INET) , _type(SOCK_STREAM) , _protocal(IPPROTO_IP) { _socket_fd = socket_fd; } Socket::Socket(int family, int type, int protocal) : _family(AF_INET) , _type(SOCK_STREAM) , _protocal(IPPROTO_IP) { _socket_fd = socket(family, type, protocal); if (_socket_fd != INVALID_SOCKET) { _family = family; _type = type; _protocal = protocal; } } Socket& Socket::operator = (SOCKET socket_fd) { _socket_fd = socket_fd; return *this; } Socket::~Socket() { if (_socket_fd != -1) { this->close(); } } bool Socket::connect(const char* host, unsigned short port) { struct sockaddr_in remote_addr; remote_addr.sin_family = _family; remote_addr.sin_port = htons(port); inet_pton(_family, host, &remote_addr.sin_addr); if (errno == EAFNOSUPPORT) return false; int ret = ::connect(_socket_fd, (struct sockaddr*)(&remote_addr), sizeof(remote_addr)); if (ret == SOCKET_ERROR) { socket_debug("Connect %s:%d failed.", host, port); socket_error(); return false; } socket_debug("Connect %s:%d successfully.", host, port); return true; } bool Socket::bind(unsigned short port) { int opt = 1; if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt)) < 0) return false; struct sockaddr_in remote_addr; remote_addr.sin_family = _family; remote_addr.sin_addr.s_addr = INADDR_ANY; remote_addr.sin_port = htons(port); int ret = ::bind(_socket_fd, (struct sockaddr*)(&remote_addr), sizeof(remote_addr)); if (ret == SOCKET_ERROR) { socket_debug("Socket(%d) bind port(%d) failed.", _socket_fd, port); return false; } socket_debug("Socket(%d) bind port(%d) successfully.", _socket_fd, port); return true; } bool Socket::listen(int backlog) { int ret = ::listen(_socket_fd, backlog); if (ret == SOCKET_ERROR) { socket_debug("Socket(%d) listen failed.", _socket_fd); return false; } socket_debug("Socket(%d) Listen successfully.", _socket_fd); return true; } Socket* Socket::accept(char* client_host) { struct sockaddr_in com_socket; socklen_t len = sizeof(com_socket); SOCKET ret = -1; do { ret = ::accept(_socket_fd, (struct sockaddr*)(&com_socket), &len); if (ret == SOCKET_ERROR) { if (errno == EINTR) continue; else { socket_debug("Socket(%d) accept failed.", _socket_fd); socket_error(); return nullptr; } } else break; } while (true); avalon::Socket* socket = avalon::Socket::create(ret); if (client_host) { sprintf(client_host, "%s", inet_ntoa(com_socket.sin_addr)); } socket_debug("Socket(%d) accept successfully, client socket: %d ip: %s", _socket_fd, socket->getSocketFD(), inet_ntoa(com_socket.sin_addr)); return socket; } ssize_t Socket::send(const char* buffer, size_t len, int flag) { ssize_t count = 0; while (count < len) { ssize_t bytes = ::send(_socket_fd, buffer + count, len - count, flag); count += bytes; if (bytes == -1 || bytes == 0) { socket_error(); break; } } return count; } ssize_t Socket::recv(char* buffer, size_t len, int flag) { return ::recv(_socket_fd, buffer, len, flag); } ssize_t Socket::write(const char* buffer, size_t len) { ssize_t count = 0; while (count < len) { ssize_t bytes = ::write(_socket_fd, buffer + count, len - count); count += bytes; if (bytes == -1 || bytes == 0) { socket_error(); break; } } return count; } ssize_t Socket::read(char* buffer, size_t len) { return ::read(_socket_fd, buffer, len); } void Socket::set_blocking(const bool blocking) { int opts; opts = fcntl(_socket_fd, F_GETFL); if (opts < 0) return; if (!blocking) opts = (opts | O_NONBLOCK); else opts = (opts & ~O_NONBLOCK); fcntl(_socket_fd, F_SETFL, opts); } int Socket::close() { int ret = -1; #ifdef WIN32 ret = closesocket(_socket_fd); #else ret = ::close(_socket_fd); #endif if (ret == SOCKET_ERROR) { socket_debug("Socket(%d) close failed.", _socket_fd); } _socket_fd = -1; return ret; } SOCKET Socket::getSocketFD() { return _socket_fd; } }
2. 多线程的模型:数组
在accept成功以后,为每一个通讯socket建立新的进程和线程,单独用于处理服务器和客户端的通讯。可是系统都会有建立进程数量的限制,在linux下,建立的线程也叫轻量级进程,因此即时建立的是线程也会受到系统的限制,一般这个默认限制是2048个,并且进程或者线程数量过多,也会致使进程或者线程切换的开销:服务器
客户端:网络
avalon::Socket* socket = avalon::Socket::create(AF_INET, SOCK_STREAM); if (socket) { if (!socket->connect("127.0.0.1", 6666)) continue; char buf[1024] = ""; sprintf(buf, "%d I am a client socket!", i); ssize_t bytes = socket->send(buf, strlen(buf), 0); char recvBuf[1024]; while (true) { memset(recvBuf, 0, 1024); bytes = socket->recv(recvBuf, 1024); if (bytes > 0) { printf("%d recv data from remote: %d %s \n", i, bytes, recvBuf); } else if (bytes == 0) { printf("remote socket %d cloese. \n", socket->getSocketFD()); break; } else { int error = avalon::socket_error(); printf("%d socket error: %d %s \n", i, error, strerror(error)); break; } } }
服务端:多线程
void communiction_handler(avalon::Socket* socket) { char buffer[1024]; while (true) { if (!socket) continue; printf("thread %ld \n", std::this_thread::get_id()); ssize_t bytes = socket->recv(buffer, 1024, 0); if (bytes > 0) { buffer[bytes] = '\0'; printf("recv msg from client: %s \n", buffer); const char* data = "I am remote.0123456789abcdefg!wwwwwer"; ssize_t sendedBytes = socket->send(data, strlen(data), 0); } else if (bytes == 0) { printf("client socket(%d) closed. thread(%ld) \n", socket->getSocketFD(), std::this_thread::get_id()); socket->close(); break; } else { int error_no = avalon::socket_error(); printf("recv msg from client failed %d %s \n", error_no, strerror(error_no)); socket->close(); break; } } } int main(int argc, const char * argv[]) { // 多线程 std::vector<std::thread> threads; avalon::Socket* listen_socket = avalon::Socket::create(AF_INET, SOCK_STREAM); do { if (!listen_socket->bind(6666)) break; if (!listen_socket->listen(10)) break; while (true) { // 多线程 avalon::Socket* clientSocket = listen_socket->accept(); if (clientSocket) { threads.push_back( std::move( std::thread(communiction_handler, clientSocket) ) ); } } while (false); for(std::thread& thread : threads){ thread.join(); } delete listen_socket; }
3. I/O多路复用app
内核一旦检测到某个I/O的读取条件就绪,就通知用户进程进行响应;socket
多路复用通常用于须要同时处理多个文件描述符,多个套接字口,多种协议的状况;函数
相比使用多进程和多线程的机制,I/O多路复用具备系统开销小的优点;
(1)select模型:
最大的问题就是链接数限制,一般是1024或者2048个,不过能够修改内核配置达到更多的链接数。可是因为select模型须要线性遍历fd集合,所以若是链接数改的过大,例如10万个,会致使线性遍历的性能问题,最后的结果多是致使超时。其次,就是内存拷贝问题,select模型在fd消息通知用户的时候,是采用的将内核中的数据拷贝到用户空间中:
服务端:
// // main.cpp // SocketServer // // Created by avl-showell on 16/8/8. // Copyright © 2016年 avl-showell. All rights reserved. // #include <iostream> #include "socket/AvalonSocket.h" #include <mutex> #include <condition_variable> #include <chrono> #include <thread> #include <vector> // select #include <sys/select.h> #include <sys/time.h> #define MAX_CLIENT_SOCKET_COUNT 10000 #define RECV_BUFFER_LEN 10 int main(int argc, const char * argv[]) { std::vector<avalon::Socket*> socket_fds(MAX_CLIENT_SOCKET_COUNT, nullptr); fd_set read_fds, write_fds; struct timeval timeout; char recvBuf[RECV_BUFFER_LEN]; avalon::Socket* listen_socket = avalon::Socket::create(AF_INET, SOCK_STREAM); do { if (!listen_socket->bind(6666)) break; if (!listen_socket->listen(10)) break; while (true) { // select int listen_socket_fd = listen_socket->getSocketFD(); int max_socket_fd = listen_socket_fd; FD_ZERO(&read_fds); FD_SET(listen_socket_fd, &read_fds); for (int i = 0; i < MAX_CLIENT_SOCKET_COUNT; ++i) { avalon::Socket* socket = socket_fds[i]; if (!socket) continue; SOCKET socket_fd = socket->getSocketFD(); if (socket_fd > 0) { FD_SET(socket_fd, &read_fds); if (socket_fd > max_socket_fd) max_socket_fd = socket_fd; } } timeout.tv_sec = 5; timeout.tv_usec = 0; int ret = select(max_socket_fd + 1, &read_fds, NULL, NULL, &timeout); if (ret == SOCKET_ERROR) { avalon::socket_error(); break; } else if (ret == 0) { printf("select socket timeout. \n"); continue; } else { printf("_______________ \n"); for (int i = 0; i < MAX_CLIENT_SOCKET_COUNT; ++i) { avalon::Socket* socket = socket_fds[i]; if (!socket) continue; SOCKET socket_fd = socket->getSocketFD(); if (socket_fd > 0 && FD_ISSET(socket_fd, &read_fds)) { int recvedBytes = 0; while (true) { memset(recvBuf, 0, RECV_BUFFER_LEN); int bytes = socket->recv(recvBuf, RECV_BUFFER_LEN); if (bytes > 0) { recvedBytes += bytes; socket->send(recvBuf, RECV_BUFFER_LEN); break; } else { avalon::socket_error(); delete socket; socket_fds[i] = nullptr; break; } } recvBuf[recvedBytes] = '\0'; printf("select: recv data from client: %s \n", recvBuf);
// 处理数据... } } if (FD_ISSET(listen_socket_fd, &read_fds)) { printf("select: new client connection. \n"); bool found = false; for (int i = 0; i < MAX_CLIENT_SOCKET_COUNT; ++i) { avalon::Socket* socket = socket_fds[i]; if (!socket) { avalon::Socket* clientSocket = listen_socket->accept(); if (clientSocket) { // clientSocket->set_blocking(false); socket_fds[i] = clientSocket; found = true; break; } } } if (!found) { printf("select: out of max sockets limit. \n"); } } } } } while (false); delete listen_socket; return 0; }
(2) poll模型:
poll模型和select模型相似,可是poll没有最大文件数量的限制,不过依然存在将消息从内核空间拷贝到用户空间的问题:
服务端:
#include <poll.h> #define MAX_CLIENT_SOCKET_COUNT 10 #define RECV_BUFFER_LEN 10 int main(int argc, const char * argv[]) { char recvBuf[RECV_BUFFER_LEN]; avalon::Socket* listen_socket = avalon::Socket::create(AF_INET, SOCK_STREAM); // poll struct pollfd client_fds[MAX_CLIENT_SOCKET_COUNT]; client_fds[0].fd = listen_socket->getSocketFD(); client_fds[0].events = POLLIN; for (int i = 1; i < MAX_CLIENT_SOCKET_COUNT; ++i) { client_fds[i].fd = -1; } int max_socket = 0; do { if (!listen_socket->bind(6666)) break; if (!listen_socket->listen(10)) break; while (true) { int ready = poll(client_fds, max_socket + 1, 3000); if (ready == -1) { avalon::socket_error(); break; } else if (ready == 0) { printf("select socket timeout. \n"); continue; } printf("_______________ \n"); if (client_fds[0].revents & POLLIN) { printf("select: new client connection. \n"); bool found = false; int i = 0; for (i = 1; i < MAX_CLIENT_SOCKET_COUNT; ++i) { avalon::Socket* socket = socket_fds[i]; if (!socket) { avalon::Socket* clientSocket = listen_socket->accept(); if (clientSocket) { client_fds[i].fd = clientSocket->getSocketFD(); client_fds[i].events = POLLIN; socket_fds[i] = clientSocket; found = true; break; } } } if (!found) { printf("select: out of max sockets limit. \n"); } else { if (i > max_socket) max_socket = i; } } for (int j = 1; j <= max_socket; ++j) { avalon::Socket* socket = socket_fds[j]; if (!socket) continue; if (client_fds[j].revents & (POLLIN | POLLERR)) { int recvedBytes = 0; while (true) { memset(recvBuf, 0, RECV_BUFFER_LEN); int bytes = socket->read(recvBuf, RECV_BUFFER_LEN); if (bytes > 0) { recvedBytes += bytes; int writedBytes = socket->write(recvBuf, RECV_BUFFER_LEN); recvBuf[bytes] = '\0'; printf("select: recv data from client: %s \n", recvBuf); if (bytes < RECV_BUFFER_LEN) break; } else { avalon::socket_error(); delete socket; client_fds[j].fd = -1; socket_fds[j] = nullptr; break; } } } } } } while (false); for(std::thread& thread : threads){ thread.join(); } delete listen_socket; return 0; }
(2) epoll模型
epoll模型是poll模型的改进版本,没有文件描述符的限制,epoll只处理活跃的文件描述符,不会遍历整个集合,并且epoll使用了内核中的“共享内存”,减小了内存的拷贝:
/* 实现功能:经过epoll, 处理多个socket * 监听一个端口,监听到有连接时,添加到epoll_event */ #include "select.h" #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <poll.h> #include <sys/epoll.h> #include <sys/time.h> #include <netinet/in.h> typedef struct _CLIENT{ int fd; struct sockaddr_in addr; /* client's address information */ } CLIENT; #define MYPORT 59000 //最多处理的connect #define MAX_EVENTS 500 //当前的链接数 int currentClient = 0; //数据接受 buf #define REVLEN 10 char recvBuf[REVLEN]; //EPOLL相关 //epoll描述符 int epollfd; //事件数组 struct epoll_event eventList[MAX_EVENTS]; void AcceptConn(int srvfd); void RecvData(int fd); int main() { int i, ret, sinSize; int recvLen = 0; fd_set readfds, writefds; int sockListen, sockSvr, sockMax; int timeout; struct sockaddr_in server_addr; struct sockaddr_in client_addr; //socket if((sockListen=socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("socket error\n"); return -1; } bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(MYPORT); server_addr.sin_addr.s_addr = htonl(INADDR_ANY); //bind if(bind(sockListen, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) { printf("bind error\n"); return -1; } //listen if(listen(sockListen, 5) < 0) { printf("listen error\n"); return -1; } //1. epoll 初始化 epollfd = epoll_create(MAX_EVENTS); struct epoll_event event; event.events = EPOLLIN|EPOLLET; event.data.fd = sockListen; //2. epoll_ctrl if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sockListen, &event) < 0) { printf("epoll add fail : fd = %d\n", sockListen); return -1; } //epoll while(1) { timeout=3000; //3. epoll_wait int ret = epoll_wait(epollfd, eventList, MAX_EVENTS, timeout); if(ret < 0) { printf("epoll error\n"); break; } else if(ret == 0) { printf("timeout ...\n"); continue; } //直接获取了事件数量,给出了活动的流,这里是和poll区别的关键 int n = 0; for(n=0; n<ret; n++) { //错误退出 if ((eventList[n].events & EPOLLERR) || (eventList[n].events & EPOLLHUP) || !(eventList[n].events & EPOLLIN)) { printf ( "epoll error\n"); close (eventList[n].data.fd); return -1; } if (eventList[n].data.fd == sockListen) { AcceptConn(sockListen); }else{ RecvData(eventList[n].data.fd); //不删除 // epoll_ctl(epollfd, EPOLL_CTL_DEL, pEvent->data.fd, pEvent); } } } close(epollfd); close(sockListen); printf("test\n"); return 0; } /************************************************** 函数名:AcceptConn 功能:接受客户端的连接 参数:srvfd:监听SOCKET ***************************************************/ void AcceptConn(int srvfd) { struct sockaddr_in sin; socklen_t len = sizeof(struct sockaddr_in); bzero(&sin, len); int confd = accept(srvfd, (struct sockaddr*)&sin, &len); if (confd < 0) { printf("bad accept\n"); return; }else { printf("Accept Connection: %d", confd); } //setnonblocking(confd); //4. epoll_wait //将新创建的链接添加到EPOLL的监听中 struct epoll_event event; event.data.fd = confd; event.events = EPOLLIN|EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, confd, &event); } //读取数据 void RecvData(int fd) { int ret; int recvLen = 0; memset(recvBuf, 0, REVLEN); printf("RecvData function\n"); if(recvLen != REVLEN) { while(1) { //recv数据 ret = recv(fd, (char *)recvBuf+recvLen, REVLEN-recvLen, 0); if(ret == 0) { recvLen = 0; break; } else if(ret < 0) { recvLen = 0; break; } //数据接受正常 recvLen = recvLen+ret; if(recvLen<REVLEN) { continue; } else { //数据接受完毕 printf("buf = %s\n", recvBuf); recvLen = 0; break; } } } printf("content is %s", recvBuf); }