Pthreads 信号量,路障,条件变量

▶ 使用信号量来进行线程间信息传递函数

● 代码post

 1 #include <stdio.h>
 2 #include <pthread.h>
 3 #include <semaphore.h>
 4 #pragma comment(lib, "pthreadVC2.lib")
 5 
 6 const int thread = 8, messageSize = 100;
 7 char messageList[thread][messageSize];  // 全局信息列表
 8 sem_t sem[thread];                      // 各线程信号量,注意每一个线程都有一个
 9 
10 void* sendMessage(void* rank)
11 {
12     const long long localRank = (long long)rank, dest = (localRank + 1) % thread;
13     int i; 
14     sprintf_s(messageList[dest], messageSize, "Hello from %2d to %2d.", localRank, dest);
15     sem_post(&sem[dest]);               // 解锁 dest 的信号量,由于上一行低吗已经完成了写入,注意每次执行完函数 sem_post() 后 sem[dest] 的值增长 1
16     sem_wait(&sem[localRank]);          // 等待本身编号的信号量解锁,注意每次执行完函数 sem_wait() 后 sem[localRank] 的值减少 1(但不会小于0)
17     printf("Thread %2d > %s\n", localRank, messageList[localRank]);
18     return nullptr;
19 }
20 
21 int main()
22 {    
23     pthread_t pth[thread];
24     int i;
25     long long list[thread];
26     
27     for (i = 0; i < thread; i++)
28     {
29         sem_init(&sem[i], 0, 0);        // 依次初始化信号量
30         list[i] = i;
31         pthread_create(&pth[i], nullptr, sendMessage, (void *)list[i]);
32     }
33     for (i = 0; i < thread; i++)
34     {
35         pthread_join(pth[i], nullptr);
36         sem_destroy(&sem[i]);           // 销毁信号量
37     }
38     printf("\nfinish.\n");
39     getchar();
40     return 0;
41 }

● 输出结果spa

 1 Thread  1 > Hello from  0 to  1.
 2 Thread  2 > Hello from  1 to  2.
 3 Thread  3 > Hello from  2 to  3.
 4 Thread  4 > Hello from  3 to  4.
 5 Thread  5 > Hello from  4 to  5.
 6 Thread  6 > Hello from  5 to  6.
 7 Thread  0 > Hello from  7 to  0.
 8 Thread  7 > Hello from  6 to  7.
 9 
10 finish.

● 用到的定义,注意信号量不是由 phread.h 提供的,而是 semaphore.h线程

 1 typedef struct sem_t_ * sem_t;
 2 
 3 PTW32_DLLPORT int __cdecl sem_init(sem_t * sem, int pshared, unsigned int value);
 4     // 初始化信号量,输入一个已经声明的信号量的指针,第二个参数不明,第三个参数为 0 表示初始化完成后信号量为上锁状态
 5 
 6 PTW32_DLLPORT int __cdecl sem_destroy(sem_t * sem);// 销毁信号量
 7 
 8 PTW32_DLLPORT int __cdecl sem_wait(sem_t * sem);   // 等待信号量为解锁状态再向下运行
 9 
10 PTW32_DLLPORT int __cdecl sem_post(sem_t * sem);   // 解锁信号量

 

▶ 使用忙等待和互斥量来实现路障指针

● 代码code

 1 #include <stdio.h>
 2 #include <pthread.h>
 3 #pragma comment(lib, "pthreadVC2.lib")
 4 
 5 const int thread = 8;
 6 int count;
 7 pthread_mutex_t pmt;
 8 
 9 void* work(void* rank)
10 {
11     const long long localRank = (long long)rank, dest = (localRank + 1) % thread;    
12     pthread_mutex_lock(&pmt);   // 进入读写区,上锁,计数器加一,解锁
13     printf("Thread %2d reached the barrier.\n", localRank); fflush(stdout);
14     count++;
15     pthread_mutex_unlock(&pmt);
16     while (count < thread);     // 使用忙等待来等全部的线程都达到栅栏
17     printf("Thread %2d passed the barrier.\n", localRank); fflush(stdout);
18     return nullptr;
19 }
20 
21 int main()
22 {    
23     pthread_t pth[thread];
24     int i;
25     long long list[thread];
26     pthread_mutex_init(&pmt, nullptr);
27     for (i = count = 0; i < thread; i++)
28     {
29         list[i] = i;
30         pthread_create(&pth[i], nullptr, work, (void *)list[i]);
31     }
32     for (i = 0; i < thread; i++)
33         pthread_join(pth[i], nullptr);
34     pthread_mutex_destroy(&pmt);
35     printf("\nfinish.\n");
36     getchar();
37     return 0;
38 }

● 输出结果blog

 1 Thread  1 reached the barrier.
 2 Thread  5 reached the barrier.
 3 Thread  2 reached the barrier.
 4 Thread  3 reached the barrier.
 5 Thread  4 reached the barrier.
 6 Thread  0 reached the barrier.
 7 Thread  6 reached the barrier.
 8 Thread  7 reached the barrier.
 9 Thread  5 passed the barrier.
10 Thread  6 passed the barrier.
11 Thread  3 passed the barrier.
12 Thread  0 passed the barrier.
13 Thread  1 passed the barrier.
14 Thread  4 passed the barrier.
15 Thread  2 passed the barrier.
16 Thread  7 passed the barrier.
17 
18 finish.

 

▶ 使用信号量来实现路障get

● 代码it

 1 #include <stdio.h>
 2 #include <pthread.h>
 3 #include <semaphore.h>
 4 #pragma comment(lib, "pthreadVC2.lib")
 5 
 6 const int thread = 8;
 7 int count;
 8 sem_t sem_count, sem_barrier;
 9 
10 void* work(void* rank)
11 {
12     const long long localRank = (long long)rank, dest = (localRank + 1) % thread;        
13     printf("Thread %2d reached the barrier.\n", localRank); fflush(stdout);
14     sem_wait(&sem_count);       // 等待容许访问计数器 count,注意执行完该语句时 sem_count 值减 1,自动上锁
15     if (count == thread - 1)    // 最后一个到达进入的线程
16     {
17         count = 0;              // 计数器清零,之后能够重复使用
18         sem_post(&sem_count);   // 计数器解锁,sem_count 值加 1
19         for (int i = 0; i < thread - 1; sem_post(&sem_barrier), i++);// 解锁整个栅栏,
20     }                                                                // 每有一个线程经过后面的语句 sem_wait(&sem_barrier);,
21     else                        // 前面到达的线程                     // sem_barrier 的值就减 1,因此这里要为该变量加上 thread - 1 
22     {
23         count++;                // 计数器加一
24         sem_post(&sem_count);   // 解锁计数器
25         sem_wait(&sem_barrier); // 等待栅栏解锁
26     }
27     printf("Thread %2d passed the barrier.\n", localRank); fflush(stdout);    
28     return nullptr;
29 }
30 
31 int main()
32 {    
33     pthread_t pth[thread];
34     int i;
35     long long list[thread];
36     
37     sem_init(&sem_count, 0, 1);     // 计数器锁初始化为 1,开锁状态
38     sem_init(&sem_barrier, 0, 0);   // 栅栏初始化为 0,关锁状态
39     for (i = count = 0; i < thread; i++)
40     {
41         list[i] = i;
42         pthread_create(&pth[i], nullptr, work, (void *)list[i]);
43     }
44     for (i = 0; i < thread; i++)
45         pthread_join(pth[i], nullptr);
46     sem_destroy(&sem_count), sem_destroy(&sem_barrier);
47     printf("\nfinish.\n");
48     getchar();
49     return 0;
50 }

● 输出结果io

Thread  0 reached the barrier.
Thread  3 reached the barrier.
Thread  4 reached the barrier.
Thread  2 reached the barrier.
Thread  1 reached the barrier.
Thread  5 reached the barrier.
Thread  7 reached the barrier.
Thread  6 reached the barrier.
Thread  4 passed the barrier.
Thread  5 passed the barrier.
Thread  2 passed the barrier.
Thread  7 passed the barrier.
Thread  3 passed the barrier.
Thread  1 passed the barrier.
Thread  6 passed the barrier.
Thread  0 passed the barrier.

finish.

 

▶ 使用条件变量来实现路障

 1 #include <stdio.h>
 2 #include <pthread.h>
 3 #include <semaphore.h>
 4 #pragma comment(lib, "pthreadVC2.lib")
 5 
 6 const int thread = 8;
 7 int count;
 8 pthread_mutex_t mutex;
 9 pthread_cond_t cond;
10 
11 void* work(void* rank)
12 {
13     const long long localRank = (long long)rank, dest = (localRank + 1) % thread;        
14     printf("Thread %2d reached the barrier.\n", localRank); fflush(stdout);
15     pthread_mutex_lock(&mutex);         // 上锁
16     count++;
17     if (count == thread)                // 最后一个进入的线程
18     {
19         count = 0;                      // 计数器清零
20         pthread_cond_broadcast(&cond);  // 广播全部线程继续向下执行
21     }
22     else
23         for (; pthread_cond_wait(&cond, &mutex) != 0;);// 等待其余线程
24     pthread_mutex_unlock(&mutex);       // 条件变量阻塞解除后会自动将互斥量上锁,须要手工解除    
25     
26     printf("Thread %2d passed the barrier.\n", localRank); fflush(stdout);    
27     return nullptr;
28 }
29 
30 int main()
31 {    
32     pthread_t pth[thread];
33     int i;
34     long long list[thread];
35     pthread_mutex_init(&mutex, nullptr);
36     pthread_cond_init(&cond, nullptr);
37     for (i = count = 0; i < thread; i++)
38     {
39         list[i] = i;
40         pthread_create(&pth[i], nullptr, work, (void *)list[i]);
41     }
42     for (i = 0; i < thread; i++)
43         pthread_join(pth[i], nullptr);
44     pthread_mutex_destroy(&mutex);
45     pthread_cond_destroy(&cond);
46     printf("\nfinish.\n");
47     getchar();
48     return 0;
49 }

● 输出结果

Thread  0 reached the barrier.
Thread  1 reached the barrier.
Thread  2 reached the barrier.
Thread  4 reached the barrier.
Thread  5 reached the barrier.
Thread  6 reached the barrier.
Thread  7 reached the barrier.
Thread  3 reached the barrier.
Thread  3 passed the barrier.
Thread  0 passed the barrier.
Thread  1 passed the barrier.
Thread  5 passed the barrier.
Thread  4 passed the barrier.
Thread  7 passed the barrier.
Thread  2 passed the barrier.
Thread  6 passed the barrier.

finish.

● 用到的定义,pthread.h

 1 typedef struct pthread_cond_t_ * pthread_cond_t;
 2 
 3 PTW32_DLLPORT int PTW32_CDECL pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr);// 初始化已经声明了的条件变量,第二个参数为属性指针
 4 
 5 PTW32_DLLPORT int PTW32_CDECL pthread_cond_destroy(pthread_cond_t * cond);                              // 销毁条件变量
 6 
 7 PTW32_DLLPORT int PTW32_CDECL pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t * mutex);        // 阻塞线程以等待 signal 或 brocast
 8 
 9 PTW32_DLLPORT int PTW32_CDECL pthread_cond_signal(pthread_cond_t * cond);                               // 解锁一个线程
10 
11 PTW32_DLLPORT int PTW32_CDECL pthread_cond_broadcast(pthread_cond_t * cond);                            // 解锁全部的线程
相关文章
相关标签/搜索