【MPI学习4】MPI并行程序设计模式:非阻塞通讯MPI程序设计

这一章讲了MPI非阻塞通讯的原理和一些函数接口,最后再用非阻塞通讯方式实现Jacobi迭代,记录学习中的一些知识。程序员

 

(1)阻塞通讯与非阻塞通讯缓存

阻塞通讯调用时,整个程序只能执行通讯相关的内容,而没法执行计算相关的内容;函数

非阻塞调用的初衷是尽可能让通讯和计算重叠进行,提升程序总体执行效率。学习

总体对比见下图:优化

 

(2)非阻塞通讯的要素spa

非阻塞通讯调用返回意味着通讯开始启动;而非阻塞通讯完成则须要调用其余的接口来查询。翻译

要素1:非阻塞通讯的调用接口设计

要素2:非阻塞通讯的完成查询接口code

理想的非阻塞通讯设计应该以下:对象

非阻塞通讯的 发送 和 接受 过程都须要同时具有以上两个要素,“调用+完成”

“调用”按照通讯方式的不一样(标准、缓存、同步、就绪),有各类函数接口,具体用到哪一个就查手册的性质。

这里“完成”是重点,由于程序员须要知道非阻塞调用是否执行完成了,来作下一步的操做。

MPI为“完成”定义了一个内部变量MPI_Request request,每一个request与一个在非阻塞调用发生时与该调用发生关联(这里的调用包括发送和接收)。

“完成”不区分通讯方式的不一样,统一用MPI_Wait系列函数来完成,这里对MPI_Wait函数作一点说明:

1)MPI_Wait(MPI_Request *request),均等着request执行完毕了,再往下进行

2)对于非重复非阻塞通讯,MPI_Wait系列函数调用的返回,还意味着request对象被释放了,程序员不用再显式释放request变量。

3)对于重复非阻塞通讯,MPI_Wait系列函数调用的返回,意味着将于request对象关联的非阻塞通讯处于不激活状态,并不释放request

关于2)3)看后面的代码示例就了解了

 

(3)非阻塞调用实现Jacobi迭代

有了非阻塞调用的技术,能够再将Jacobi迭代的程序效率提高,其整体的实现思路以下:

1)先计算Jacobi迭代下次计算所须要的边界数据,这些数据与每一个计算节点中的计算无关,能够先独立计算好

2)启动非阻塞通讯,将边界数据在进程间传递

3)计算每一个计算节点能够独立计算的部分;此时,2)中启动的非阻塞通讯也在进行中,这时通讯和计算就重叠了

4)等着非阻塞通讯完成,再进行下一次迭代

再回顾一下以前用阻塞通讯实现Jacobi迭代的思路:

1)先传递边界数据

2)等着数据都传递完了,再进行计算

3)等着计算完成了,进行下一次迭代

能够看到阻塞通讯中实现Jacobi迭代的程序中,在同一计算节点下,通讯和计算是分别进行的,效率不如非阻塞通讯。

总结起来:

“单机程序 → 阻塞通讯MPI程序” 实现单机计算到多机计算,用并行代替串行提升效率。

“阻塞MPI程序 → 非阻塞MPI程序” 不只将多台机器之间的并行,并且还能将每台机器的通讯与计算过程并行,实现更高效的并行。 

 

(4)非阻塞通讯实现Jacobi迭代的代码

书上的源代码是Fortan的,数据存储是列优先的,矩阵按列分块;下面的代码是我翻译的C的代码,数据存储是行优先的,矩阵按行分块。

  1 #include "mpi.h"
  2 #include <stdio.h>
  3 #include <stdlib.h>
  4 
  5 #define N 8
  6 #define SIZE N/4
  7 #define T 2
  8 
  9 void print_matrix(int myid, float myRows[][N]);
 10 
 11 int main(int argc, char *argv[])
 12 {
 13     float matrix1[SIZE+2][N], matrix2[SIZE+2][N];
 14     int myid;
 15     MPI_Status status[4];
 16     MPI_Request request[4];
 17 
 18     MPI_Init(&argc, &argv);
 19     MPI_Comm_rank(MPI_COMM_WORLD, &myid);
 20 
 21     // 初始化
 22     int i,j;
 23     for(i=0; i<SIZE+2; i++)
 24     {
 25         for(j=0; j<N; j++)
 26         {
 27             matrix1[i][j] = matrix2[i][j] = 0;
 28         }
 29     }
 30     if(0==myid) // 按行划分 上面第一分块矩阵 上边界
 31     {
 32         for(j=0; j<N; j++) matrix1[1][j] = matrix2[1][j] = N;
 33     }
 34     if (3==myid) { // 按行划分 最下面一分块矩阵 下边界
 35         for(j=0; j<N; j++) matrix1[SIZE][j] = matrix2[SIZE][j] = N;
 36     }
 37     for(i=1; i<SIZE+1; i++) // 每一个矩阵的两侧边界
 38     {
 39         matrix1[i][0] = matrix1[i][N-1] = matrix2[i][0] = matrix2[i][N-1] = N;
 40     }
 41     // 引入虚拟进程 并计算每一个进程上下相邻进程
 42     int up_proc_id = myid==0 ? MPI_PROC_NULL : myid-1;
 43     int down_proc_id = myid==3 ? MPI_PROC_NULL : myid+1;
 44     // jacobi迭代过程
 45     int t,row,col;
 46     for(t=0; t<T; t++)
 47     {
 48         // 1 计算边界数据
 49         if(0==myid) // 最上的矩阵块
 50         {
 51             for (col=1; col<N-1; col++)
 52             {
 53                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
 54             }
 55         }
 56         else if (3==myid) { // 最下的矩阵块
 57             for (col=1; col<N-1; col++)
 58             {
 59                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
 60             }
 61         }
 62         else {
 63             for(col=1; col<N-1; col++) // 中间的矩阵块
 64             {
 65                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
 66                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
 67             }
 68         }
 69         // 2 利用非阻塞函数传递边界数据 为下一次计算作准备
 70         int tag1 = 1, tag2 = 2;
 71         MPI_Isend(&matrix2[1][0], N, MPI_FLOAT, up_proc_id, tag1, MPI_COMM_WORLD, &request[0]);
 72         MPI_Isend(&matrix2[SIZE][0], N, MPI_FLOAT, down_proc_id, tag2, MPI_COMM_WORLD, &request[1]);
 73         MPI_Irecv(&matrix1[SIZE+1][0], N, MPI_FLOAT, down_proc_id, tag1, MPI_COMM_WORLD, &request[2]);
 74         MPI_Irecv(&matrix1[0][0], N, MPI_FLOAT, up_proc_id, tag2, MPI_COMM_WORLD, &request[3]);
 75         // 3 计算中间数据
 76         int begin_row = 0==myid ? 2 : 1;
 77         int end_row = 3==myid ? (SIZE-1) : SIZE; 
 78         for (row=begin_row; row<end_row; row++)
 79         {
 80             for (col=1; col<N-1; col++)
 81             {
 82                 matrix2[row][col] = (matrix1[row][col-1]+matrix1[row][col+1]+matrix1[row+1][col]+matrix1[row-1][col])*0.25;
 83             }
 84         }
 85         // 4 更新矩阵 并等待各个进程间数据传递完毕
 86         for (row=begin_row; row<=end_row; row++)
 87         {
 88             for (col=1; col<N-1; col++)
 89             {
 90                 matrix1[row][col] = matrix2[row][col];
 91             }
 92         }
 93         MPI_Waitall(4, &request[0], &status[0]);
 94     }
 95     MPI_Barrier(MPI_COMM_WORLD);
 96     print_matrix(myid, matrix1);
 97     MPI_Finalize();
 98 }
 99 
100 
101 void print_matrix(int myid, float myRows[][N])
102 {
103     int i,j;
104     int buf[1];
105     MPI_Status status;
106     buf[0] = 1;
107     if ( myid>0 ) {
108         MPI_Recv(buf, 1, MPI_INT, myid-1, 0, MPI_COMM_WORLD, &status);
109     }
110     printf("Result in process %d:\n", myid);
111     for ( i = 0; i<SIZE+2; i++)
112     {
113         for ( j = 0; j<N; j++)
114             printf("%1.3f\t", myRows[i][j]);
115         printf("\n");
116     }
117     if ( myid<3 ) {
118         MPI_Send(buf, 1, MPI_INT, myid+1, 0, MPI_COMM_WORLD);
119     }
120     MPI_Barrier(MPI_COMM_WORLD);
121 }

程序的执行结果以下:

上述程序设计的逻辑以下:

1)各个分块矩阵的边界数据是能够须要通讯交换的

2)先计算边界数据,尽可能把须要通讯交换并且又相对独立的数据先计算出来

3)用非阻塞通讯传递分块矩阵的边界数据;同时每一个节点内计算内部的数据;计算与通讯并行

4)等到每一个计算节点的2个发送、2个接收,总共4个非阻塞调用都完成了,进行下一轮迭代

 

(5)重复非阻塞通讯

上面实现Jacobi迭代的代码中,以进程1和进程2为例:

1)迭代一轮两者之间就须要互相通讯一次

2)每次互相通讯,随着MPI_Wait的执行,request通讯对象释放,两个进程通讯彻底被切断了

3)两个进程之间每次通讯,有一些通讯链接操做都是重复的,最好不用每次通讯都从新执行这些链接操做,以此提升效率

4)所以,比上面实现jacobi迭代更优化一些的作法是:每次不彻底掐断两个进程的非阻塞通讯,保持那些基础的通用的操做,每次迭代只须要更新须要传输的数据,再激活两个进程之间的非阻塞通讯

依照上面的思路,MPI给出了重复非阻塞的通讯调用实现。用重复非阻塞的通讯再实现一次Jacobi迭代,代码以下:

  1 #include "mpi.h"
  2 #include <stdio.h>
  3 #include <stdlib.h>
  4 
  5 #define N 8
  6 #define SIZE N/4
  7 #define T 2
  8 
  9 void print_matrix(int myid, float myRows[][N]);
 10 
 11 int main(int argc, char *argv[])
 12 {
 13     float matrix1[SIZE+2][N], matrix2[SIZE+2][N];
 14     int myid;
 15     MPI_Status status[4];
 16     MPI_Request request[4];
 17 
 18     MPI_Init(&argc, &argv);
 19     MPI_Comm_rank(MPI_COMM_WORLD, &myid);
 20 
 21     // 初始化
 22     int i,j;
 23     for(i=0; i<SIZE+2; i++)
 24     {
 25         for(j=0; j<N; j++)
 26         {
 27             matrix1[i][j] = matrix2[i][j] = 0;
 28         }
 29     }
 30     if(0==myid) // 按行划分 上面第一分块矩阵 上边界
 31     {
 32         for(j=0; j<N; j++) matrix1[1][j] = matrix2[1][j] = N;
 33     }
 34     if (3==myid) { // 按行划分 最下面一分块矩阵 下边界
 35         for(j=0; j<N; j++) matrix1[SIZE][j] = matrix2[SIZE][j] = N;
 36     }
 37     for(i=1; i<SIZE+1; i++) // 每一个矩阵的两侧边界
 38     {
 39         matrix1[i][0] = matrix1[i][N-1] = matrix2[i][0] = matrix2[i][N-1] = N;
 40     }
 41     // 引入虚拟进程 并计算每一个进程上下相邻进程
 42     int up_proc_id = myid==0 ? MPI_PROC_NULL : myid-1;
 43     int down_proc_id = myid==3 ? MPI_PROC_NULL : myid+1;
 44     // 初始化重复非阻塞通讯
 45     int tag1 = 1, tag2 = 2;
 46     MPI_Send_init(&matrix2[1][0], N, MPI_FLOAT, up_proc_id, tag1, MPI_COMM_WORLD, &request[0]);
 47     MPI_Send_init(&matrix2[SIZE][0], N, MPI_FLOAT, down_proc_id, tag2, MPI_COMM_WORLD, &request[1]);
 48     MPI_Recv_init(&matrix1[SIZE+1][0], N, MPI_FLOAT, down_proc_id, tag1, MPI_COMM_WORLD, &request[2]);
 49     MPI_Recv_init(&matrix1[0][0], N, MPI_FLOAT, up_proc_id, tag2, MPI_COMM_WORLD, &request[3]);
 50     // jacobi迭代过程
 51     int t,row,col;
 52     for(t=0; t<T; t++)
 53     {
 54         // 1 计算边界数据
 55         if(0==myid) // 最上的矩阵块
 56         {
 57             for (col=1; col<N-1; col++)
 58             {
 59                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
 60             }
 61         }
 62         else if (3==myid) { // 最下的矩阵块
 63             for (col=1; col<N-1; col++)
 64             {
 65                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
 66             }
 67         }
 68         else {
 69             for(col=1; col<N-1; col++) // 中间的矩阵块
 70             {
 71                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
 72                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
 73             }
 74         }
 75         // 2 启动重复非阻塞通讯
 76         MPI_Startall(4, &request[0]);
 77         // 3 计算中间数据
 78         int begin_row = 0==myid ? 2 : 1;
 79         int end_row = 3==myid ? (SIZE-1) : SIZE; 
 80         for (row=begin_row; row<end_row; row++)
 81         {
 82             for (col=1; col<N-1; col++)
 83             {
 84                 matrix2[row][col] = (matrix1[row][col-1]+matrix1[row][col+1]+matrix1[row+1][col]+matrix1[row-1][col])*0.25;
 85             }
 86         }
 87         // 4 更新矩阵 并等待各个进程间数据传递完毕
 88         for (row=begin_row; row<=end_row; row++)
 89         {
 90             for (col=1; col<N-1; col++)
 91             {
 92                 matrix1[row][col] = matrix2[row][col];
 93             }
 94         }
 95         MPI_Waitall(4, &request[0], &status[0]);
 96     }
 97     int n;
 98     for(n = 0; n < 4; n++) MPI_Request_free(&request[n]); // 释放非阻塞通讯对象
 99     MPI_Barrier(MPI_COMM_WORLD);
100     print_matrix(myid, matrix1);
101     MPI_Finalize();
102 }
103 
104 
105 void print_matrix(int myid, float myRows[][N])
106 {
107     int i,j;
108     int buf[1];
109     MPI_Status status;
110     buf[0] = 1;
111     if ( myid>0 ) {
112         MPI_Recv(buf, 1, MPI_INT, myid-1, 0, MPI_COMM_WORLD, &status);
113     }
114     printf("Result in process %d:\n", myid);
115     for ( i = 0; i<SIZE+2; i++)
116     {
117         for ( j = 0; j<N; j++)
118             printf("%1.3f\t", myRows[i][j]);
119         printf("\n");
120     }
121     if ( myid<3 ) {
122         MPI_Send(buf, 1, MPI_INT, myid+1, 0, MPI_COMM_WORLD);
123     }
124     MPI_Barrier(MPI_COMM_WORLD);
125 }

1)上述的代码不难理解,能够查阅相关函数手册;最核心的思想就是,若是两个进程有屡次迭代通讯,就能够用这种重复非阻塞的通讯函数。

2)另外,对于重复非阻塞通讯的调用,在调用MPI_Wait系列函数时,不会释放与通讯关联的request函数(即上面说的保持一些共性的通讯设定操做,不彻底掐断),所以,须要在line98中,程序员手动释放非则色通讯操做对象

相关文章
相关标签/搜索