原文做者:aircrafthtml
原文连接:https://www.cnblogs.com/DOMLX/p/9661012.html前端
先讲Linux下(windows下在后面能够直接跳到后面看):python
前面咱们讲过多进程服务器,但咱们知道它开销很大,所以咱们才引入线程,咱们能够把它当作是一种轻量级进程。它相比进程有以下几个优势:react
进程:在操做系统构成单独执行流的单位。
线程:在进程构成单独执行流的单位。
它们的包含关系是,操做系统 > 进程 > 线程。进程与线程具体差别实际上是这样的,每一个进程都有独立的完整内存空间,它包括全局数据区,堆区,栈区,而多进程服务器之因此开销大是由于只是为了区分栈区里的不一样函数流执行而把数据区,堆区,栈区内存所有复制了一份。而多线程就高效多了,它只把栈区分离出来,进程中的数据区,堆区则共享。具体内存结构示例图以下:
linux
下面的程序,咱们能够用它来建立一个线程:ios
#include <pthread.h> pthread_create (thread, attr, start_routine, arg)
在这里,pthread_create 建立一个新的线程,并让它可执行。下面是关于参数的说明:c++
参数 | 描述 |
---|---|
thread | 指向线程标识符指针。 |
attr | 一个不透明的属性对象,能够被用来设置线程属性。您能够指定线程属性对象,也可使用默认值 NULL。 |
start_routine | 线程运行函数起始地址,一旦线程被建立就会执行。 |
arg | 运行函数的参数。它必须经过把引用做为指针强制转换为 void 类型进行传递。若是没有传递参数,则使用 NULL。 |
建立线程成功时,函数返回 0,若返回值不为 0 则说明建立线程失败。编程
使用下面的程序,咱们能够用它来终止一个线程:windows
#include <pthread.h> pthread_exit (status)
在这里,pthread_exit 用于显式地退出一个线程。一般状况下,pthread_exit() 函数是在线程完成工做后无需继续存在时被调用。后端
若是 main() 是在它所建立的线程以前结束,并经过 pthread_exit() 退出,那么其余线程将继续执行。不然,它们将在 main() 结束时自动被终止。
如下简单的实例代码使用 pthread_create() 函数建立了 5 个线程,每一个线程输出"Hello Runoob!":
#include <iostream> // 必须的头文件 #include <pthread.h> using namespace std; #define NUM_THREADS 5 // 线程的运行函数 void* say_hello(void* args) { cout << "Hello Runoob!" << endl; return 0; } int main() { // 定义线程的 id 变量,多个变量使用数组 pthread_t tids[NUM_THREADS]; for(int i = 0; i < NUM_THREADS; ++i) { //参数依次是:建立的线程id,线程参数,调用的函数,传入的函数参数 int ret = pthread_create(&tids[i], NULL, say_hello, NULL); if (ret != 0) { cout << "pthread_create error: error_code=" << ret << endl; } } //等各个线程退出后,进程才结束,不然进程强制结束了,线程可能还没反应过来; pthread_exit(NULL); }
linux下编译运行后结果为:
Hello Runoob!
Hello Runoob!
Hello Runoob!
Hello Runoob!
Hello Runoob!
如下简单的实例代码使用 pthread_create() 函数建立了 5 个线程,并接收传入的参数。每一个线程打印一个 "Hello Runoob!" 消息,并输出接收的参数,而后调用 pthread_exit() 终止线程。
//文件名:test.cpp #include <iostream> #include <cstdlib> #include <pthread.h> using namespace std; #define NUM_THREADS 5 void *PrintHello(void *threadid) { // 对传入的参数进行强制类型转换,由无类型指针变为整形数指针,而后再读取 int tid = *((int*)threadid); cout << "Hello Runoob! 线程 ID, " << tid << endl; pthread_exit(NULL); } int main () { pthread_t threads[NUM_THREADS]; int indexes[NUM_THREADS];// 用数组来保存i的值 int rc; int i; for( i=0; i < NUM_THREADS; i++ ){ cout << "main() : 建立线程, " << i << endl; indexes[i] = i; //先保存i的值 // 传入的时候必须强制转换为void* 类型,即无类型指针 rc = pthread_create(&threads[i], NULL, PrintHello, (void *)&(indexes[i])); if (rc){ cout << "Error:没法建立线程," << rc << endl; exit(-1); } } pthread_exit(NULL); }
linux下编译运行后结果为:
main() : 建立线程, 0 main() : 建立线程, 1 Hello Runoob! 线程 ID, 0 main() : 建立线程, Hello Runoob! 线程 ID, 21 main() : 建立线程, 3 Hello Runoob! 线程 ID, 2 main() : 建立线程, 4 Hello Runoob! 线程 ID, 3
这个实例演示了如何经过结构传递多个参数。您能够在线程回调中传递任意的数据类型,由于它指向 void,以下面的实例所示:
#include <iostream> #include <cstdlib> #include <pthread.h> using namespace std; #define NUM_THREADS 5 struct thread_data{ int thread_id; char *message; }; void *PrintHello(void *threadarg) { struct thread_data *my_data; my_data = (struct thread_data *) threadarg; cout << "Thread ID : " << my_data->thread_id ; cout << " Message : " << my_data->message << endl; pthread_exit(NULL); } int main () { pthread_t threads[NUM_THREADS]; struct thread_data td[NUM_THREADS]; int rc; int i; for( i=0; i < NUM_THREADS; i++ ){ cout <<"main() : creating thread, " << i << endl; td[i].thread_id = i; td[i].message = (char*)"This is message"; rc = pthread_create(&threads[i], NULL, PrintHello, (void *)&td[i]); if (rc){ cout << "Error:unable to create thread," << rc << endl; exit(-1); } } pthread_exit(NULL); }
linux下编译运行后结果为:
main() : creating thread, 0 main() : creating thread, 1 Thread ID : 0 Message : This is message main() : creating thread, Thread ID : 21 Message : This is message main() : creating thread, 3 Thread ID : 2 Message : This is message main() : creating thread, 4 Thread ID : 3 Message : This is message Thread ID : 4 Message : This is message
咱们可使用如下两个函数来链接或分离线程:
pthread_join (threadid, status) pthread_detach (threadid)
pthread_join() 子程序阻碍调用程序,直到指定的 threadid 线程终止为止。当建立一个线程时,它的某个属性会定义它是不是可链接的(joinable)或可分离的(detached)。只有建立时定义为可链接的线程才能够被链接。若是线程建立时被定义为可分离的,则它永远也不能被链接。
用途:有的人没有在main 函数最后调用 pthread_exit(NULL); 函数等待,而是选择sleep,这里就能够用pthread_join()代替sleep的不可控制,,而有时候线程结束的时候你想作某一些事情须要知道线程是否结束了,也能够调用这个函数。
这个实例演示了如何使用 pthread_join() 函数来等待线程的完成。
#include <iostream> #include <cstdlib> #include <pthread.h> #include <unistd.h> using namespace std; #define NUM_THREADS 5 void *wait(void *t) { int i; long tid; tid = (long)t; sleep(1); cout << "Sleeping in thread " << endl; cout << "Thread with id : " << tid << " ...exiting " << endl; pthread_exit(NULL); } int main () { int rc; int i; pthread_t threads[NUM_THREADS]; pthread_attr_t attr; void *status; // 初始化并设置线程为可链接的(joinable) pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for( i=0; i < NUM_THREADS; i++ ){ cout << "main() : creating thread, " << i << endl; rc = pthread_create(&threads[i], NULL, wait, (void *)&i ); if (rc){ cout << "Error:unable to create thread," << rc << endl; exit(-1); } } // 删除属性,并等待其余线程 pthread_attr_destroy(&attr); for( i=0; i < NUM_THREADS; i++ ){ rc = pthread_join(threads[i], &status); if (rc){ cout << "Error:unable to join," << rc << endl; exit(-1); } cout << "Main: completed thread id :" << i ; cout << " exiting with status :" << status << endl; } cout << "Main: program exiting." << endl; pthread_exit(NULL); }
linux下编译运行结果:
main() : creating thread, 0 main() : creating thread, 1 main() : creating thread, 2 main() : creating thread, 3 main() : creating thread, 4 Sleeping in thread Thread with id : 4 ...exiting Sleeping in thread Thread with id : 3 ...exiting Sleeping in thread Thread with id : 2 ...exiting Sleeping in thread Thread with id : 1 ...exiting Sleeping in thread Thread with id : 0 ...exiting Main: completed thread id :0 exiting with status :0 Main: completed thread id :1 exiting with status :0 Main: completed thread id :2 exiting with status :0 Main: completed thread id :3 exiting with status :0 Main: completed thread id :4 exiting with status :0 Main: program exiting.
前面咱们知道了怎么建立线程,下面咱们再来看看这样一个实例,建立100个线程,它们都访问了同一变量,其中一半对这个变量进行加1操做,一半进行减1操做,按道理其结果会等于0.
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #define NUM_THREAD 100 void * thread_inc(void * arg); void * thread_des(void * arg); long long num = 0; //long long类型是64位整数型,多线程共同访问 int main(int argc, char *argv[]) { pthread_t thread_id[NUM_THREAD]; int i; //建立100个线程,一半执行thread_inc,一半执行thread_des for(i = 0; i < NUM_THREAD; i++) { if(i %2) pthread_create(&(thread_id[i]), NULL, thread_inc, NULL); else pthread_create(&(thread_id[i]), NULL, thread_des, NULL); } //等待线程返回 for (i = 0; i < NUM_THREAD; i++) pthread_join(thread_id[i], NULL); printf("result: %lld \n", num); //+1,-1按道理结果是0 return 0; } //线程入口函数1 void * thread_inc(void * arg) { for (int i = 0; i < 50000000; i++) num += 1;//临界区(引发问题的语句就是临界区位置) return NULL; } //线程入口函数2 void * thread_des(void * arg) { for (int i = 0; i < 50000000; i++) num -= 1;//临界区 return NULL; }
从运行结果看并非0,并且每次运行的结果都不一样。那这是什么缘由引发的呢? 是由于每一个线程访问一个变量是这样一个过程:先从内存取出这个变量值到CPU,而后CPU计算获得改变后的值,最后再将这个改变后的值写回内存。所以,咱们能够很容易看出,多个线程访问同一变量,若是某个线程还只刚从内存取出数据,还没来得及写回内存,这时其它线程又访问了这个变量,因此这个值就会不正确了。
为何会出现这种状况呢,来举个例子:
如上图所示:两个线程都要将某一个共同访问的变量加1,
就像上面说的这个运算过程是:线程1先拿到值而后通过cpu的运算在赋值回去,而后线程2在取值运算放回,上图实现的是最理想的状况,假如这时候线程一拿到了值99,同时线程二没间隔的也拿了99,这时候就要出问题了。线程一运算后赋值100回去,而后线程二运算后又赋值100回去,,,注意了哈,这里两个线程都是为了Num++服务,他们这样搞事情不就表明一个作了无用功吗?(我胖虎要是还拿的动刀还不打死你!!!)
这些看完应该就理解了为何须要线程同步!!!!以及线程同步的重要性了吧!!
接下来咱们再来说讲怎么解决这个问题:线程同步
线程同步用于解决线程访问顺序引起的问题,通常是以下两种状况:
针对这两种可能引起的状况,咱们分别使用的同步技术是:互斥量和信号量。
#include <pthread.h> int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr); //建立互斥量 int pthread_mutex_destroy(pthread_mutex_t *mutex);//销毁互斥量 int pthread_mutex_lock(pthread_mutex_t *mutex);//加锁 int pthread_mutex_unlock(pthread_mutex_t *mutex);//释放锁
简言之,就是利用lock和unlock函数围住临界区的两端。当某个线程调用pthread_mutex_lock进入临界区后,若是没有调用pthread_mutex_unlock释放锁退出,那么其它线程就会一直阻塞在临界区以外,咱们把这种状况称之为死锁。因此临界区围住必定要lock和unlock一一对应。
接下来看一下代码示例:
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #define NUM_THREAD 100 void * thread_inc(void * arg); void * thread_des(void * arg); long long num = 0; pthread_mutex_t mutex; int main(int argc, char *argv[]) { pthread_t thread_id[NUM_THREAD]; int i; //互斥量的建立 pthread_mutex_init(&mutex, NULL); for(i = 0; i < NUM_THREAD; i++) { if(i %2) pthread_create(&(thread_id[i]), NULL, thread_inc, NULL); else pthread_create(&(thread_id[i]), NULL, thread_des, NULL); } for (i = 0; i < NUM_THREAD; i++) pthread_join(thread_id[i], NULL); printf("result: %lld \n", num); pthread_mutex_destroy(&mutex); //互斥量的销毁 return 0; } /*扩展临界区,减小加锁,释放锁调用次数,但这样变量必须加满到50000000次后其它线程才能访问. 这样是延长了线程的等待时间,但缩短了加锁,释放锁函数调用的时间,这里没有定论,本身酌情考虑*/ void * thread_inc(void * arg) { pthread_mutex_lock(&mutex); //互斥量锁住 for (int i = 0; i < 1000000; i++) num += 1; pthread_mutex_unlock(&mutex); //互斥量释放锁 return NULL; } /*缩短了线程等待时间,但循环建立,释放锁函数调用时间增长*/ void * thread_des(void * arg) { for (int i = 0; i < 1000000; i++) { pthread_mutex_lock(&mutex); num -= 1; pthread_mutex_unlock(&mutex); } return NULL; }
编译运行能够获得结果为:0
信号量
信号量与互斥量相似,只是互斥量是用锁来控制线程访问而信号量是用二进制0,1来完成控制线程顺序。sem_post信号量加1,sem_wait信号量减1,当信号量为0时,sem_wait就会阻断,所以经过这样让信号量加1减1就能控制线程的执行顺序了。
注释:mac上测试信号量函数返回-1失败,之后仍是Linux上整吧,也许这些接口已通过时了…
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);//建立信号量 int sem_destroy(sem_t *sem);//销毁信号量 int sem_post(sem_t *sem);//信号量加1 int sem_wait(sem_t *sem);//信号量减1,为0时阻塞
实例代码:线程A从用户输入获得值后存入全局变量num,此时线程B将取走该值并累加。该过程共进行5次,完成后输出总和并退出程序。
#include <stdio.h> #include <pthread.h> #include <semaphore.h> void * read(void * arg); void * accu(void * arg); static sem_t sem_one; static sem_t sem_two; static int num; int main(int argc, char *argv[]) { pthread_t id_t1, id_t2; sem_init(&sem_one, 0, 0); sem_init(&sem_two, 0, 1); pthread_create(&id_t1, NULL, read, NULL); pthread_create(&id_t2, NULL, accu, NULL); pthread_join(id_t1, NULL); pthread_join(id_t2, NULL); sem_destroy(&sem_one); sem_destroy(&sem_two); return 0; } void * read(void * arg) { int i; for (i = 0; i < 5; i++) { fputs("Input num: ", stdout); sem_wait(&sem_two); scanf("%d", &num); sem_post(&sem_one); } return NULL; } void * accu(void * arg) { int sum = 0 , i; for (i = 0; i < 5; i++) { sem_wait(&sem_one); sum+= num; sem_post(&sem_two); } printf("Result: %d \n", sum); return NULL; }
补充:线程的销毁,线程建立后并非其入口函数返回后就会自动销毁,须要手动销毁,否则线程建立的内存空间将一直存在。通常手动销毁有以下两种方式:1,调用pthread_join函数,其返回后同时销毁线程 ,是一个阻断函数,服务端通常不用它销毁,由于服务端主线程不宜阻断,还要实时监听客服端链接。2,调用pthread_detach函数,不会阻塞,线程返回自动销毁线程,不过要注意调用它后不能再调用pthread_join函数,它与pthread_join主要区别就是一个是阻塞函数,一个不阻塞。
使用多线程实现了一个简单的聊天程序,并对临界区(clnt_cnt,clnt_socks)进行加锁访问.
// // main.cpp // hello_server // // Created by app05 on 15-10-22. // Copyright (c) 2015年 app05. All rights reserved. //临界区是:clnt_cnt和clnt_socks访问处 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h> #include <pthread.h> #define BUF_SIZE 100 #define MAX_CLNT 256 void * handle_clnt(void * arg); void send_msg(char *msg, int len); void error_handling(char * msg); int clnt_cnt = 0; int clnt_socks[MAX_CLNT]; pthread_mutex_t mutx; int main(int argc, char *argv[]) { int serv_sock, clnt_sock; struct sockaddr_in serv_adr, clnt_adr; socklen_t clnt_adr_sz; pthread_t t_id; if (argc != 2) { printf("Usage : %s <port> \n", argv[0]); exit(1); } //建立互斥量 pthread_mutex_init(&mutx, NULL); serv_sock = socket(PF_INET, SOCK_STREAM, 0); memset(&serv_adr, 0, sizeof(serv_adr)); serv_adr.sin_family = AF_INET; serv_adr.sin_addr.s_addr = htonl(INADDR_ANY); serv_adr.sin_port = htons(atoi(argv[1])); if(bind(serv_sock, (struct sockaddr *) &serv_adr, sizeof(serv_adr)) == -1) error_handling("bind() error"); if(listen(serv_sock, 5) == -1) error_handling("listen() error"); while (1) { clnt_adr_sz = sizeof(clnt_adr); clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz); //阻断,监听客服端链接请求 //临界区 pthread_mutex_lock(&mutx); //加锁 clnt_socks[clnt_cnt++] = clnt_sock; //新链接的客服端保存到clnt_socks数组里 pthread_mutex_unlock(&mutx); //释放锁 //建立线程 pthread_create(&t_id, NULL, handle_clnt, (void*) &clnt_sock); pthread_detach(t_id); //销毁线程,线程return后自动调用销毁,不阻断 printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr)); } close(serv_sock); return 0; } //线程执行 void * handle_clnt(void * arg) { int clnt_sock = *((int *)arg); int str_len = 0, i; char msg[BUF_SIZE]; while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0) send_msg(msg, str_len); //从数组中移除当前客服端 pthread_mutex_lock(&mutx); for (i = 0; i < clnt_cnt; i++) { if (clnt_sock == clnt_socks[i]) { while (i++ < clnt_cnt - 1) clnt_socks[i] = clnt_socks[i + 1]; break; } } clnt_cnt--; pthread_mutex_unlock(&mutx); close(clnt_sock); return NULL; } //向全部链接的客服端发送消息 void send_msg(char * msg, int len) { int i; pthread_mutex_lock(&mutx); for (i = 0; i < clnt_cnt; i++) write(clnt_socks[i], msg, len); pthread_mutex_unlock(&mutx); } void error_handling(char *message) { fputs(message, stderr); fputc('\n', stderr); exit(1); }
客户端:
// // main.cpp // hello_client // // Created by app05 on 15-10-22. // Copyright (c) 2015年 app05. All rights reserved. // // #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h> #include <pthread.h> #define BUF_SIZE 100 #define NAME_SIZE 20 void * send_msg(void * arg); void * recv_msg(void * arg); void error_handling(char *message); char name[NAME_SIZE] = "[DEFAULT]"; char msg[BUF_SIZE]; int main(int argc, const char * argv[]) { int sock; struct sockaddr_in serv_addr; pthread_t snd_thread, rcv_thread; void * thread_return; if(argc != 4) { printf("Usage: %s <IP> <port> \n", argv[0]); exit(1); } sprintf(name, "[%s]", argv[3]); //聊天人名字,配置到编译器参数里 sock = socket(PF_INET, SOCK_STREAM, 0); if(sock == -1) error_handling("socket() error"); memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = inet_addr(argv[1]); serv_addr.sin_port = htons(atoi(argv[2])); if (connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) error_handling("connect() error"); //多线程分离输入和输出 pthread_create(&snd_thread, NULL, send_msg, (void *)&sock); pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock); //阻塞,等待返回 pthread_join(snd_thread, &thread_return); pthread_join(rcv_thread, &thread_return); close(sock); return 0; } //发送消息 void * send_msg(void * arg) { int sock = *((int *)arg); char name_msg[NAME_SIZE + BUF_SIZE]; while (1) { fgets(msg, BUF_SIZE, stdin); if (!strcmp(msg, "q\n") || !strcmp(msg, "Q \n")) { close(sock); exit(0); } sprintf(name_msg, "%s %s", name, msg); write(sock, name_msg, strlen(name_msg)); } return NULL; } //接收消息 void * recv_msg(void * arg) { int sock = *((int *)arg); char name_msg[NAME_SIZE + BUF_SIZE]; int str_len; while (1) { str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1); if(str_len == -1) return (void *)-1; name_msg[str_len] = 0; fputs(name_msg, stdout); } return NULL; } void error_handling(char *message) { fputs(message, stderr); fputc('\n', stderr); exit(1); }
windows下:
线程是系统内核对象之一。在学习线程以前,应先了解一下内核对象。内核对象是系统内核分配的一个内存块,该内存块描述的是一个数据结构,其成员负责维护对象的各类信息。内核对象的数据只能由系统内核来访问,应用程序没法在内存中找到这些数据结构并直接改变他们的内容。
经常使用的系统内核对象有事件对象、文件对象、做业对象、互斥对象、管道对象、进程对象和线程对象等。不一样类型的内核对象,其数据结构各有不一样。
进程被认为是一个正在运行的程序的实例,它也属于系统内核对象。能够将进程简单的理解为一个容器,它只是提供空间,执行程序的代码是由线程来实现的。线程存在于进程中,它负责执行进程地址空间中的代码。当一个进程建立时,系统会自动为其建立一个线程,该线程被称为主线程。在主线程中用户能够经过代码建立其余线程,当进程中的主线程结束时,进程也就结束了。
Windows下,建立线程有多种方式,如下将逐一介绍。注意它们的区别。
Windows API函数。该函数在主线程的基础上建立一个新线程。微软在Windows API中提供了创建新的线程的函数CreateThread。
HANDLECreateThread( LPSECURITY_ATTRIBUTES lpThreadAttributes,//线程安全属性 DWORD dwStackSize,//堆栈大小 LPTHREAD_START_ROUTINE lpStartAddress,//线程函数 LPVOID lpParameter,//线程参数 DWORD dwCreationFlags,//线程建立属性 LPDWORD lpThreadId//线程ID );
示例代码:
#include "stdafx.h" #include<iostream> #include<Windows.h> using namespace std; DWORD WINAPI Fun1Proc(LPVOID lpParameter) { cout << "thread function Fun1Proc!\n"; return 0; } int main() { HANDLE hThread1 = CreateThread(NULL, 0, Fun1Proc, NULL, 0, NULL); CloseHandle(hThread1); Sleep(1000); cout << "main end!\n"; system("pause"); return 0; }
结果图:
除了使用CreateThread API函数建立线程外,还能够用C++语言提供的_beginthreadex函数来建立线程。
uintptr_t _beginthreadex( // NATIVE CODE void *security, //线程安全属性 unsigned stack_size, //线程的栈大小 unsigned ( *start_address )( void * ),//线程函数 void *arglist, //传递到线程函数中的参数 unsigned initflag, //线程初始化标记 unsigned *thrdaddr //线程ID );
示例代码:
#include "stdafx.h" #include<iostream> #include<Windows.h> #include<process.h> using namespace std; unsigned int _stdcall ThreadProc(LPVOID lpParameter) { cout << "thread function ThreadProc!\n"; return 0; } int main() { _beginthreadex(NULL, 0, ThreadProc, 0, 0, NULL); Sleep(1000); cout << "main end!\n"; system("pause"); return 0; }
二.线程同步
在程序中使用多线程时,通常不多有多个线程能在其生命期内进行彻底独立的操做。更多的状况是一些线程进行某些处理操做,而其余的线程必须对其处理结果进行了解。正常状况下对这种处理结果的了解应当在其处理任务完成后进行。
若是不采起适当的措施,其余线程每每会在线程处理任务结束前就去访问处理结果,这就颇有可能获得有关处理结果的错误了解。例如,多个线程同时访问同一个全局变量,若是都是读取操做,则不会出现问题。若是一个线程负责改变此变量的值,而其余线程负责同时读取变量内容,则不能保证读取到的数据是通过写线程修改后的。
为了确保读线程读取到的是通过修改的变量,就必须在向变量写入数据时禁止其余线程对其的任何访问,直至赋值过程结束后再解除对其余线程的访问限制。这种保证线程能了解其余线程任务处理结束后的处理结果而采起的保护措施即为线程同步。
代码示例:
两个线程同时对一个全局变量进行加操做,演示了多线程资源访问冲突的状况。
#include "stdafx.h" #include<windows.h> #include<iostream> using namespace std; int number = 1; unsigned long __stdcall ThreadProc1(void* lp) { while (number < 100) { cout << "thread 1 :"<<number << endl; ++number; _sleep(100); } return 0; } unsigned long __stdcall ThreadProc2(void* lp) { while (number < 100) { cout << "thread 2 :"<<number << endl; ++number; _sleep(100); } return 0; } int main() { CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL); CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL); Sleep(10*1000); system("pause"); return 0; }
能够看到有时两个线程计算的值相同,这就跟上面Linux下建立一百个线程将数字加减为0没成功同样的道理,都是访问内存的时候冲突了。
为何会出现这种状况呢,来举个例子:
如上图所示:两个线程都要将某一个共同访问的变量加1,
就像上面说的这个运算过程是:线程1先拿到值而后通过cpu的运算在赋值回去,而后线程2在取值运算放回,上图实现的是最理想的状况,假如这时候线程一拿到了值99,同时线程二没间隔的也拿了99,这时候就要出问题了。线程一运算后赋值100回去,而后线程二运算后又赋值100回去,,,注意了哈,这里两个线程都是为了Num++服务,他们这样搞事情不就表明一个作了无用功吗?(我胖虎要是还拿的动刀还不打死你!!!)
这些看完应该就理解了为何须要线程同步!!!!以及线程同步的重要性了吧!!
线程之间通讯的两个基本问题是互斥和同步。
线程同步是指线程之间所具备的一种制约关系,一个线程的执行依赖另外一个线程的消息,当它没有获得另外一个线程的消息时应等待,直到消息到达时才被唤醒。
线程互斥是指对于共享的操做系统资源(指的是广义的”资源”,而不是Windows的.res文件,譬如全局变量就是一种共享资源),在各线程访问时的排它性。当有若干个线程都要使用某一共享资源时,任什么时候刻最多只容许一个线程去使用,其它要使用该资源的线程必须等待,直到占用资源者释放该资源。
线程互斥是一种特殊的线程同步。实际上,互斥和同步对应着线程间通讯发生的两种状况:
当有多个线程访问共享资源而不使资源被破坏时;
当一个线程须要将某个任务已经完成的状况通知另一个或多个线程时。
从大的方面讲,线程的同步可分用户模式的线程同步和内核对象的线程同步两大类。
用户模式中线程的同步方法主要有原子访问和临界区等方法。其特色是同步速度特别快,适合于对线程运行速度有严格要求的场合。
内核对象的线程同步则主要由事件、等待定时器、信号量以及信号灯等内核对象构成。因为这种同步机制使用了内核对象,使用时必须将线程从用户模式切换到内核模式,而这种转换通常要耗费近千个CPU周期,所以同步速度较慢,但在适用性上却要远优于用户模式的线程同步方式。
在WIN32中,同步机制主要有如下几种:
(1)事件(Event);
(2)信号量(semaphore);
(3)互斥量(mutex);
(4)临界区(Critical section)。
临界区(Critical Section)是一段独占对某些共享资源访问的代码,在任意时刻只容许一个线程对共享资源进行访问。若是有多个线程试图同时访问临界区,那么在有一个线程进入后其余全部试图访问此临界区的线程将被挂起,并一直持续到进入临界区的线程离开。临界区在被释放后,其余线程能够继续抢占,并以此达到用原子方式操做共享资源的目的。
临界区在使用时以CRITICAL_SECTION结构对象保护共享资源,并分别用EnterCriticalSection()和LeaveCriticalSection()函数去标识和释放一个临界区。所用到的CRITICAL_SECTION结构对象必须通过InitializeCriticalSection()的初始化后才能使用,并且必须确保全部线程中的任何试图访问此共享资源的代码都处在此临界区的保护之下。不然临界区将不会起到应有的做用,共享资源依然有被破坏的可能。
代码示例:
#include "stdafx.h" #include<windows.h> #include<iostream> using namespace std; int number = 1; //定义全局变量 CRITICAL_SECTION Critical; //定义临界区句柄 unsigned long __stdcall ThreadProc1(void* lp) { while (number < 100) { EnterCriticalSection(&Critical); cout << "thread 1 :"<<number << endl; ++number; _sleep(100); LeaveCriticalSection(&Critical); } return 0; } unsigned long __stdcall ThreadProc2(void* lp) { while (number < 100) { EnterCriticalSection(&Critical); cout << "thread 2 :"<<number << endl; ++number; _sleep(100); LeaveCriticalSection(&Critical); } return 0; } int main() { InitializeCriticalSection(&Critical); //初始化临界区对象 CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL); CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL); Sleep(10*1000); system("pause"); return 0; }
问题解决!!!
事件(Event)是WIN32提供的最灵活的线程间同步方式,事件能够处于激发状态(signaled or true)或未激发状态(unsignal or false)。根据状态变迁方式的不一样,事件可分为两类:
(1)手动设置:这种对象只可能用程序手动设置,在须要该事件或者事件发生时,采用SetEvent及ResetEvent来进行设置。
(2)自动恢复:一旦事件发生并被处理后,自动恢复到没有事件状态,不须要再次设置。
使用”事件”机制应注意如下事项:
(1)若是跨进程访问事件,必须对事件命名,在对事件命名的时候,要注意不要与系统命名空间中的其它全局命名对象冲突;
(2)事件是否要自动恢复;
(3)事件的初始状态设置。
因为event对象属于内核对象,故进程B能够调用OpenEvent函数经过对象的名字得到进程A中event对象的句柄,而后将这个句柄用于ResetEvent、SetEvent和WaitForMultipleObjects等函数中。此法能够实现一个进程的线程控制另外一进程中线程的运行,例如:
HANDLE hEvent=OpenEvent(EVENT_ALL_ACCESS,true,"MyEvent"); ResetEvent(hEvent);
示例代码:
#include "stdafx.h" #include<windows.h> #include<iostream> using namespace std; int number = 1; //定义全局变量 HANDLE hEvent; //定义事件句柄 unsigned long __stdcall ThreadProc1(void* lp) { while (number < 100) { WaitForSingleObject(hEvent, INFINITE); //等待对象为有信号状态 cout << "thread 1 :"<<number << endl; ++number; _sleep(100); SetEvent(hEvent); } return 0; } unsigned long __stdcall ThreadProc2(void* lp) { while (number < 100) { WaitForSingleObject(hEvent, INFINITE); //等待对象为有信号状态 cout << "thread 2 :"<<number << endl; ++number; _sleep(100); SetEvent(hEvent); } return 0; } int main() { CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL); CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL); hEvent = CreateEvent(NULL, FALSE, TRUE, "event"); Sleep(10*1000); system("pause"); return 0; }
运行结果都同样就不来显示出来了。
信号量是维护0到指定最大值之间的同步对象。信号量状态在其计数大于0时是有信号的,而其计数是0时是无信号的。信号量对象在控制上能够支持有限数量共享资源的访问。
信号量的特色和用途可用下列几句话定义:
(1)若是当前资源的数量大于0,则信号量有效;
(2)若是当前资源数量是0,则信号量无效;
(3)系统决不容许当前资源的数量为负值;
(4)当前资源数量决不能大于最大资源数量。
函数原型为:
HANDLE CreateSemaphore ( PSECURITY_ATTRIBUTE psa, //信号量的安全属性 LONG lInitialCount, //开始时可供使用的资源数 LONG lMaximumCount, //最大资源数 PCTSTR pszName); //信号量的名称
经过调用ReleaseSemaphore函数,线程就可以对信标的当前资源数量进行递增,该函数原型为:
BOOL WINAPI ReleaseSemaphore( HANDLE hSemaphore, //要增长的信号量句柄 LONG lReleaseCount, //信号量的当前资源数增长lReleaseCount LPLONG lpPreviousCount //增长前的数值返回 );
和其余核心对象同样,信号量也能够经过名字跨进程访问,打开信号量的API为:
HANDLE OpenSemaphore ( DWORD fdwAccess, //access BOOL bInherithandle, //若是容许子进程继承句柄,则设为TRUE PCTSTR pszName //指定要打开的对象的名字 );
代码示例:
#include "stdafx.h" #include<windows.h> #include<iostream> using namespace std; int number = 1; //定义全局变量 HANDLE hSemaphore; //定义信号量句柄 unsigned long __stdcall ThreadProc1(void* lp) { long count; while (number < 100) { WaitForSingleObject(hSemaphore, INFINITE); //等待信号量为有信号状态 cout << "thread 1 :"<<number << endl; ++number; _sleep(100); ReleaseSemaphore(hSemaphore, 1, &count); } return 0; } unsigned long __stdcall ThreadProc2(void* lp) { long count; while (number < 100) { WaitForSingleObject(hSemaphore, INFINITE); //等待信号量为有信号状态 cout << "thread 2 :"<<number << endl; ++number; _sleep(100); ReleaseSemaphore(hSemaphore, 1, &count); } return 0; } int main() { hSemaphore = CreateSemaphore(NULL, 1, 100, "sema"); CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL); CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL); Sleep(10*1000); system("pause"); return 0; }
结果同样。
采用互斥对象机制。 只有拥有互斥对象的线程才有访问公共资源的权限,由于互斥对象只有一个,因此能保证公共资源不会同时被多个线程访问。互斥不只能实现同一应用程序的公共资源安全共享,还能实现不一样应用程序的公共资源安全共享。
代码示例:
#include "stdafx.h" #include<windows.h> #include<iostream> using namespace std; int number = 1; //定义全局变量 HANDLE hMutex; //定义互斥对象句柄 unsigned long __stdcall ThreadProc1(void* lp) { while (number < 100) { WaitForSingleObject(hMutex, INFINITE); cout << "thread 1 :"<<number << endl; ++number; _sleep(100); ReleaseMutex(hMutex); } return 0; } unsigned long __stdcall ThreadProc2(void* lp) { while (number < 100) { WaitForSingleObject(hMutex, INFINITE); cout << "thread 2 :"<<number << endl; ++number; _sleep(100); ReleaseMutex(hMutex); } return 0; } int main() { hMutex = CreateMutex(NULL, false, "mutex"); //建立互斥对象 CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL); CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL); Sleep(10*1000); system("pause"); return 0; }
结果同样的。
三.多线程+IOCP实现服务端
(1)为何使用IOCP模型。
socket是内核对象句柄,每次对socket执行操做,须要用户对象到内核对象的转换,执行完成返回结果,须要内核对象到用户对象的转换。
IOCP的中文名称是完成端口,目前是Windows下最高效的网络模型。特色:半异步,非阻塞。(我理解的彻底异步是回调式,不须要人工参与,可是IOCP的异步须要轮询)。
其余模型的缺点:
1)select模型:最低效,每次检索对长度有限制(默认是64个连接),能够经过修改头文件的方式修改上限,须要手动循环查询是否有操做可执行,因此很低效;
2)WSAEvent,事件模型,缺点也是有上限,每次最多监听64个事件,在收到事件通知后,去手动recv数据,效率比select高许多,由于操做是系统消息通知的,能够实现异步;
3)完成例程模型,是对事件模型的改进,去掉了64个事件的上限
以上模型还有个缺点,就是每次有操做可执行时,须要手动去执行recv或者accept等操做,涉及到内核对象<->用户对象的两次切换(订制获取消息时一次,recv/accept操做一次),并且对于accept来讲,每次手动调用,都会产生一个socket,当大量accept来到时,产生socket的过程会很是耗时。
知道其余模型的缺点,就知道了完成端口的优势:1)没有监听上限;2)对于accept来讲,socket是提早创建准备好的,收到链接时直接返回以前传入的socket;3)只涉及到一次内核对象<->用户对象切换(订制消息时一次),由于在订制消息的时候,已经把数据缓存地址给了内核对象,内核对象在收到数据、写入缓存后,才切换回用户对象,让用户拿走数据。总的来讲,完成端口是proactor模型,其余的是reactor模型。
(2)IOCP理解与应用。
扯远点。首先传统服务器的网络IO流程以下:
接到一个客户端链接->建立一个线程负责这个链接的IO操做->持续对新线程进行数据处理->所有数据处理完毕->终止线程。
可是这样的设计代价是:
此时咱们能够考虑使用线程池解决其中3和4的问题。这种传统的服务器网络结构称之为会话模型。
后来咱们为防止大量线程的维护,建立了I/O模型,它被但愿要求能够:
1:容许一个线程在不一样时刻给多个客户端进行服务。
2:容许一个客户端在不一样时间被多个线程服务。
这样作的话,咱们的线程则会大幅度减小,这就要求如下两点:
1:客户端状态的分离,以前会话模式咱们能够经过线程状态得知客户端状态,但如今客户端状态要经过其余方式获取。
2:I/O请求的分离。一个线程再也不服务于一个客户端会话,则要求客户端对这个线程提交I/O处理请求。
那么就产生了这样一个模式,分为三部分:
上面的作法,则将网络链接 和I/O工做线程分离为三个部分,相互通信仅依靠 I/O请求。此时可知有如下一些建议:
它是一种WIN32的网络I/O模型,既包括了网络链接部分,也负责了部分的I/O操做功能,用于方便咱们控制有并发性的网络I/O操做。它有以下特色:
使用IOCP的基本步骤很简单:
(1) 完成端口实现的API
CreateIoCompletionPort
HANDLE WINAPI CreateIoCompletionPort( _In_ HANDLE FileHandle, _In_opt_ HANDLE ExistingCompletionPort, _In_ ULONG_PTR CompletionKey, _In_ DWORD NumberOfConcurrentThreads );
返回值:若是函数成功,则返回值是I / O完成端口的句柄:若是函数失败,则返回值为NULL。
功能:两个功能,建立完成端口句柄与将新的文件句柄(套接字)绑定到完成端口(咱们也能够理解为完成队列,只是这个队列由操做系统本身维护)
FileHandle:文件句柄或INVALID_HANDLE_VALUE。建立完成端口的时候,该值设置为INVALID_HANDLE_VALUE,Ghost里面时候的是一个临时的socket句柄,不过咱们不用必定要这样。
ExistingCompletionPort:现有I / O完成端口的句柄或NULL。若是此参数为现有I / O完成端口,则该函数将其与FileHandle参数指定的句柄相关联。若是成功则函数返回现有I / O完成端口的句柄。若是此参数为NULL,则该函数将建立一个新的I / O完成端口,若是FileHandle参数有效,则将其与新的I / O完成端口相关联。不然,不会发生文件句柄关联。若是成功,该函数将把句柄返回给新的I / O完成端口。
CompletionKey:该值就是相似线程里面传递的一个参数,咱们在GetQueuedCompletionStatus中第三个参数得到的就是这个值。
NumberOfConcurrentThreads:若是此参数为NULL,则系统容许与系统中的处理器同样多的并发运行的线程。若是ExistingCompletionPort参数不是NULL,则忽略此参数。
GetQueuedCompletionStatus
BOOL WINAPI GetQueuedCompletionStatus( _In_ HANDLE CompletionPort, _Out_ LPDWORD lpNumberOfBytes, _Out_ PULONG_PTR lpCompletionKey, _Out_ LPOVERLAPPED *lpOverlapped, _In_ DWORD dwMilliseconds );
返回值:成功返回TRUE,失败返回FALSE,若是设置了超时时间,超时返回FALSE
功能:从完成端口中获取已经完成的消息
CompletionPort:完成端口的句柄。
lpNumberOfBytes:该变量接收已完成的I / O操做期间传输的字节数。
lpCompletionKey:该变量及时咱们 CreateIoCompletionPort中传递的第三个参数
lpOverlapped:接收完成的I / O操做启动时指定的OVERLAPPED结构的地址。咱们能够经过CONTAINING_RECORD这个宏获取以该重叠结构为首地址的结构体信息,也就是该重叠结构为何必须放在结构体的首地址的缘由。
dwMilliseconds:超时时间(毫秒),若是为INFINITE则一直等待直到有消息到来。
备注: CreateIoCompletionPort 提供这个功能:I/O系统能够被用来向列队的I/O完成端口发送I/O完成通知包。当 你执行一个已经关联一个完成端口的文件I/O操做,I/O系统将会在这个I/O操做完成的时候向I/O完成端口发送一个完成通知包,I/O完成端口将以先 进先出的方式放置这个I/O完成通知包,并使用GetQueuedCompletionStatus 接收I/O完成通知包。
虽然容许任何数量的 线程来调用 GetQueuedCompletionStatus 等待一个I/O完成端口,但每一个线程只能同时间内关联一个I/O完成端口,且此端口是线程最后检查的那个端口。
当一个包被放入队列中,系统首先会 检查有多少个关联此端口的线程在运行,若是运行的线程的数量少于NumberOfConcurrentThreads的值,那么容许其中的一个等 待线程去处理包。当一个运行的线程完成处理,将再次调用GetQueuedCompletionStatus ,此时系统容许另外一个等待线程去处理包。
系 统也容许一个等待的线程处理包若是运行的线程进入任何形式的等待状态,当这个线程从等待状态进入运行状态,可能会有一个很短的时期活动线程的数量会超过 NumberOfConcurrentThreads 的值,此时,系统会经过不容许任何新的活动线程快速的减小线程个数,直到活动线程少于NumberOfConcurrentThreads 的值。
PostQueuedCompletionStatus
BOOL WINAPI PostQueuedCompletionStatus( _In_ HANDLE CompletionPort, _In_ DWORD dwNumberOfBytesTransferred, _In_ ULONG_PTR dwCompletionKey, _In_opt_ LPOVERLAPPED lpOverlapped );
返回值:成功,返回非零,失败返回零。使用GetLasrError获取最后的错误码
功能:手动向完成端口投递一个异步消息。就相似咱们Win32中的PostMessage
CompletionPort:完成端口的句柄。
dwNumberOfBytesTransferred:经过GetQueuedCompletionStatus函数的lpNumberOfBytesTransferred参数返回的值。
dwCompletionKey:经过GetQueuedCompletionStatus函数的lpCompletionKey参数返回的值。
lpOverlapped:经过GetQueuedCompletionStatus函数的lpOverlapped参数返回的值。
能够看到上面后三个参数均可以传递给
GetQueuedCompletionStatus
,这样—来。—个工做者线程收到传递过来的三个GetQueuedCompletionStatus函数参数后,即可根据由这三个参数的某一个设置的特殊值,决定什么时候应该退出。例如,可用dwCompletionPort参数传递0值,而—个工做者线程会将其解释成停止指令。一旦全部工做者线程都已关闭,即可使用CloseHandle函数,关闭完成端口。最终安全退出程序。
PostQueuedCompletionStatus函数提供了一种方式来与线程池中的全部线程进行通讯。如,当用户终止服务应用程序时,咱们想要全部线程都彻底利索地退出。可是若是各线程还在等待完成端口而又没有已完成的I/O 请求,那么它们将没法被唤醒。
经过为线程池中的每一个线程都调用一次PostQueuedCompletionStatus,咱们能够将它们都唤醒。每一个线程会对GetQueuedCompletionStatus的返回值进行检查,若是发现应用程序正在终止,那么它们就能够进行清理工做并正常地退出。
CONTAINING_RECORD
PCHAR CONTAINING_RECORD( [in] PCHAR Address, [in] TYPE Type, [in] PCHAR Field );
功能:返回给定结构类型的结构实例的基地址和包含结构中字段的地址。
返回值:返回包含Field的结构的基地址。
Address:咱们经过GetQueuedCompletionStatus获取的重叠结构
Type:以重叠结构为首地址的结构体
Field:Type结构体的重叠结构变量
(2)相关其余函数
AcceptEx
BOOL AcceptEx( _In_ SOCKET sListenSocket, _In_ SOCKET sAcceptSocket, _In_ PVOID lpOutputBuffer, _In_ DWORD dwReceiveDataLength, _In_ DWORD dwLocalAddressLength, _In_ DWORD dwRemoteAddressLength, _Out_ LPDWORD lpdwBytesReceived, _In_ LPOVERLAPPED lpOverlapped );
返回值:成功返回TRUE,失败返回FALSE
功能:投递异步的接收操做,相似于实现了一个网络内存池,这个池中存放的是已经创造好的套接字(因为要进行异步操做,因此该套接字也要使用WSASocket建立),当有用户链接的时候,操做系统会直接从这个网络内存池中拿出一个来给链接的客户端,这个过程咱们少去了链接时才创造套接字的过程(建立一个套接字的过程内部是很复杂的),这也是这个函数优异的地方。
该函数的参数很明确,只是有些其他的话还须要提醒,AcceptEx该函数还须要经过函数指针得到,由于该函数不是windows自身的API。具体的获取过程也只是循序渐进,MSDN有详细的例子,示例代码中也有详细的过程,笔者就不赘述了。
AcceptEx函数
使用Accept(或WSAAccept)接受链接,当并发链接数超过大概30000(这取决于系统资源)的时候,容易出现WSAENOBUFS(10055)错误。这种错误主要是由于系统不能及时为新链接进来的客户端分配socket资源。所以咱们应该找到一种的使用以前可以分配socket资源的方法。AcceptEx 就是咱们寻找的答案,它的主要优点就是在使用socket资源以前就会分分配好资源,它的其余方面的特色就比较麻烦使人费解了。(参见MSDN库。)
服务端代码:
#define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdlib.h> #include <process.h> #include <winsock2.h> #include <windows.h> #pragma comment(lib,"ws2_32.lib");//加载ws2_32.dll #define BUF_SIZE 100 #define READ 3 #define WRITE 5 typedef struct // socket info { SOCKET hClntSock; SOCKADDR_IN clntAdr; } PER_HANDLE_DATA, *LPPER_HANDLE_DATA; typedef struct // buffer info { OVERLAPPED overlapped; WSABUF wsaBuf; char buffer[BUF_SIZE]; int rwMode; // READ or WRITE 读写模式 } PER_IO_DATA, *LPPER_IO_DATA; unsigned int WINAPI EchoThreadMain(LPVOID CompletionPortIO); void ErrorHandling(char *message); SOCKET ALLCLIENT[100]; int clientcount = 0; HANDLE hMutex;//互斥量 int main(int argc, char* argv[]) { hMutex = CreateMutex(NULL, FALSE, NULL);//建立互斥量 WSADATA wsaData; HANDLE hComPort; SYSTEM_INFO sysInfo; LPPER_IO_DATA ioInfo; LPPER_HANDLE_DATA handleInfo; SOCKET hServSock; SOCKADDR_IN servAdr; int i; DWORD recvBytes = 0,flags = 0; if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) ErrorHandling("WSAStartup() error!"); hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);//建立CP对象 GetSystemInfo(&sysInfo);//获取当前系统的信息 for (i = 0; i < sysInfo.dwNumberOfProcessors; i++) _beginthreadex(NULL, 0, EchoThreadMain, (LPVOID)hComPort, 0, NULL);//建立=CPU个数的线程数 hServSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);//不是非阻塞套接字,可是重叠IO套接字。 memset(&servAdr, 0, sizeof(servAdr)); servAdr.sin_family = AF_INET; servAdr.sin_addr.s_addr = htonl(INADDR_ANY); servAdr.sin_port = htons(1234); bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr)); listen(hServSock, 5); while (1) { SOCKET hClntSock; SOCKADDR_IN clntAdr; int addrLen = sizeof(clntAdr); hClntSock = accept(hServSock, (SOCKADDR*)&clntAdr, &addrLen); handleInfo = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));//和重叠IO同样 handleInfo->hClntSock = hClntSock;//存储客户端套接字 WaitForSingleObject(hMutex, INFINITE);//线程同步 ALLCLIENT[clientcount++] = hClntSock;//存入套接字队列 ReleaseMutex(hMutex); memcpy(&(handleInfo->clntAdr), &clntAdr, addrLen); CreateIoCompletionPort((HANDLE)hClntSock, hComPort, (DWORD)handleInfo, 0);//链接套接字和CP对象 //已完成信息将写入CP对象 ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//存储接收到的信息 memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED)); ioInfo->wsaBuf.len = BUF_SIZE; ioInfo->wsaBuf.buf = ioInfo->buffer;//和重叠IO同样 ioInfo->rwMode = READ;//读写模式 WSARecv(handleInfo->hClntSock, &(ioInfo->wsaBuf),//非阻塞模式 1, &recvBytes, &flags, &(ioInfo->overlapped), NULL); } CloseHandle(hMutex);//销毁互斥量 return 0; } unsigned int WINAPI EchoThreadMain(LPVOID pComPort)//线程的执行 { HANDLE hComPort = (HANDLE)pComPort; SOCKET sock; DWORD bytesTrans; LPPER_HANDLE_DATA handleInfo; LPPER_IO_DATA ioInfo; DWORD flags = 0; while (1)//大循环 { GetQueuedCompletionStatus(hComPort, &bytesTrans,//确认“已完成”的I/O!! (LPDWORD)&handleInfo, (LPOVERLAPPED*)&ioInfo, INFINITE);//INFINITE使用时,程序将阻塞,直到已完成的I/O信息写入CP对象 sock = handleInfo->hClntSock;//客户端套接字 if (ioInfo->rwMode == READ)//读写模式(此时缓冲区有数据) { puts("message received!"); if (bytesTrans == 0) // 链接结束 { WaitForSingleObject(hMutex, INFINITE);//线程同步 closesocket(sock); int i = 0; while (ALLCLIENT[i] == sock){ i++; } ALLCLIENT[i] = 0;//断开置0 ReleaseMutex(hMutex); free(handleInfo); free(ioInfo); continue; } int i = 0; for (; i < clientcount;i++) { if (ALLCLIENT[i] != 0)//判断是否为已链接的套接字 { if (ALLCLIENT[i] != sock) { LPPER_IO_DATA newioInfo; newioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//动态分配内存 memset(&(newioInfo->overlapped), 0, sizeof(OVERLAPPED)); strcpy(newioInfo->buffer, ioInfo->buffer);//从新构建新的内存,防止屡次释放free newioInfo->wsaBuf.buf = newioInfo->buffer; newioInfo->wsaBuf.len = bytesTrans; newioInfo->rwMode = WRITE; WSASend(ALLCLIENT[i], &(newioInfo->wsaBuf),//回声 1, NULL, 0, &(newioInfo->overlapped), NULL); } else { memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED)); ioInfo->wsaBuf.len = bytesTrans; ioInfo->rwMode = WRITE; WSASend(ALLCLIENT[i], &(ioInfo->wsaBuf),//回声 1, NULL, 0, &(ioInfo->overlapped), NULL); } } } ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//动态分配内存 memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED)); ioInfo->wsaBuf.len = BUF_SIZE; ioInfo->wsaBuf.buf = ioInfo->buffer; ioInfo->rwMode = READ; WSARecv(sock, &(ioInfo->wsaBuf),//再非阻塞式接收 1, NULL, &flags, &(ioInfo->overlapped), NULL); } else { puts("message sent!"); free(ioInfo); } } return 0; } void ErrorHandling(char *message) { fputs(message, stderr); fputc('\n', stderr); exit(1); }
客户端:
#define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdlib.h> #include <string.h> #include <windows.h> #include <process.h> #define BUF_SIZE 1000 #define NAME_SIZE 20 #pragma comment(lib, "ws2_32.lib") //加载 ws2_32.dll unsigned WINAPI SendMsg(void * arg);//发送信息函数 unsigned WINAPI RecvMsg(void * arg);//接受信息函数 void ErrorHandling(char * msg);//错误返回函数 int haveread = 0; char NAME[50];//[名字] char ANAME[50]; char msg[BUF_SIZE];//信息 int main(int argc, char *argv[]) { printf("请输入网名:"); scanf("%s", NAME); WSADATA wsaData; SOCKET hSock; SOCKADDR_IN servAdr; HANDLE hSndThread, hRcvThread; if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) ErrorHandling("WSAStartup() error!"); hSock = socket(PF_INET, SOCK_STREAM, 0); memset(&servAdr, 0, sizeof(servAdr)); servAdr.sin_family = AF_INET; servAdr.sin_addr.s_addr = inet_addr("127.0.0.1"); servAdr.sin_port = htons(1234); if (connect(hSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR) ErrorHandling("connect() error"); int resultsend; puts("Welcome to joining our chatting room!\n"); sprintf(ANAME, "[%s]", NAME); hSndThread = (HANDLE)_beginthreadex(NULL, 0, SendMsg, (void*)&hSock, 0, NULL);//写线程 hRcvThread = (HANDLE)_beginthreadex(NULL, 0, RecvMsg, (void*)&hSock, 0, NULL);//读线程 WaitForSingleObject(hSndThread, INFINITE);//等待线程结束 WaitForSingleObject(hRcvThread, INFINITE); closesocket(hSock); WSACleanup(); system("pause"); return 0; } unsigned WINAPI SendMsg(void * arg) // send thread main { SOCKET sock = *((SOCKET*)arg); char name_msg[NAME_SIZE + BUF_SIZE]; char padd[2]; fgets(padd, 2, stdin);//多余的'\n' printf("\n send message:"); while (1) { { fgets(msg, BUF_SIZE, stdin); if (!strcmp(msg, "q\n") || !strcmp(msg, "Q\n")) { closesocket(sock); exit(0); } sprintf(name_msg, "[%s] %s", NAME, msg); char numofmsg = strlen(name_msg) + '0'; char newmsg[100]; newmsg[0] = numofmsg; newmsg[1] = 0;//第一个字符表示消息的长度 strcat(newmsg, name_msg); int result = send(sock, newmsg, strlen(newmsg), 0); if (result == -1)return -1;//发送错误 } } return NULL; } unsigned WINAPI RecvMsg(void * arg) // read thread main { SOCKET sock = *((SOCKET*)arg); char name_msg[NAME_SIZE + BUF_SIZE]; int str_len = 0; while (1) { { char lyfstr[1000] = { 0 }; int totalnum = 0; str_len = recv(sock, name_msg, 1, 0);//读取第一个字符!获取消息的长度 if (str_len == -1)//读取错误 { printf("return -1\n"); return -1; } if (str_len == 0)//读取结束 { printf("return 0\n"); return 0;//读取结束 } totalnum = name_msg[0] - '0'; int count = 0; do { str_len = recv(sock, name_msg, 1, 0); name_msg[str_len] = 0; if (str_len == -1)//读取错误 { printf("return -1\n"); return -1; } if (str_len == 0) { printf("return 0\n"); return 0;//读取结束 } strcat(lyfstr, name_msg); count = str_len + count; } while (count < totalnum); lyfstr[count] = '\0'; printf("\n"); strcat(lyfstr, "\n"); fputs(lyfstr, stdout); printf(" send message:"); fflush(stdout); memset(name_msg, 0, sizeof(char)); } } return NULL; } void ErrorHandling(char * msg) { fputs(msg, stderr); fputc('\n', stderr); exit(1); }
最后说一句啦。本网络编程入门系列博客是连载学习的,有兴趣的能够看我博客其余篇。。。。c++ 网络编程课设入门超详细教程 ---目录
参考博客:https://blog.csdn.net/kaida1234/article/details/79465713
参考博客:http://www.runoob.com/cplusplus/cpp-multithreading.html
参考博客:https://blog.csdn.net/u010223072/article/details/49335867
参考博客:https://blog.csdn.net/wxf2012301351/article/details/73504281
参考书籍:《TCP/IP网络编程 ---尹圣雨》
如有兴趣交流分享技术,可关注本人公众号,里面会不按期的分享各类编程教程,和共享源码,诸如研究分享关于c/c++,python,前端,后端,opencv,halcon,opengl,机器学习深度学习之类有关于基础编程,图像处理和机器视觉开发的知识