矩阵乘法问题描述以下:ios
给定矩阵A和B,其中A是m*p大小矩阵,B是p*n大小的矩阵。求C = A*B。算法
求解这个问题最简单的算法是遍历A的行和B的列,求得C的相应元素,时间复杂度O(mnp),空间复杂度O(1)。编程
// 矩阵乘法的C++实现 for(int i=0; i<m; i++){ for(int j=0; j<n; j++){ float temp = 0.0; for(int k=0; k<p; k++){ temp += A[i*p + k] * B[k*n + j]; } C[i*n + j] = temp; } }
要改进上述算法为并行算法,须要了解到C++ MPI编程的特色:服务器
a. 各个进程之间不能有依赖。这是由于各个进程能够以任意的时间顺序执行。分布式
b. 数据是分布式存储的。也就是说,每一个进程有本身独立的数据备份。spa
有了这两点认识后,一种最简单的并行方案就出来了:(假设开启np个进程)code
(1). 首先将矩阵A和C按行分为np块;blog
(2). 进程号为 id 的进程读取A的第 id 个分块和B;three
(3). 进程号为 id 的进程求解相应的C的第 id 个分块。进程
代码以下:
/* filename: matMultiplyWithMPI.cpp
* parallel matrix multiplication with MPI * C(m,n) = A(m,p) * B(p,n) * input: three parameters - m, p, n
* @copyright: fengfu-chris */ #include<iostream> #include<mpi.h> #include<math.h> #include<stdlib.h>
void initMatrixWithRV(float *A, int rows, int cols);
void matMultiplyWithSingleThread(float *A, float *B, float *matResult, int m, int p, int n); int main(int argc, char** argv) { int m = atoi(argv[1]); int p = atoi(argv[2]); int n = atoi(argv[3]); float *A, *B, *C; float *bA, *bC; int myrank, numprocs; MPI_Status status; MPI_Init(&argc, &argv); // 并行开始 MPI_Comm_size(MPI_COMM_WORLD, &numprocs); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); int bm = m / numprocs; bA = new float[bm * p]; B = new float[p * n]; bC = new float[bm * n]; if(myrank == 0){ A = new float[m * p]; C = new float[m * n]; initMatrixWithRV(A, m, p); initMatrixWithRV(B, p, n); } MPI_Barrier(MPI_COMM_WORLD);
/* step 1: 数据分配 */ MPI_Scatter(A, bm * p, MPI_FLOAT, bA, bm *p, MPI_FLOAT, 0, MPI_COMM_WORLD); MPI_Bcast(B, p * n, MPI_FLOAT, 0, MPI_COMM_WORLD);
/* step 2: 并行计算C的各个分块 */ matMultiplyWithSingleThread(bA, B, bC, bm, p, n); MPI_Barrier(MPI_COMM_WORLD);
/* step 3: 汇总结果 */ MPI_Gather(bC, bm * n, MPI_FLOAT, C, bm * n, MPI_FLOAT, 0, MPI_COMM_WORLD);
/* step 3-1: 解决历史遗留问题(多余的分块) */ int remainRowsStartId = bm * numprocs; if(myrank == 0 && remainRowsStartId < m){ int remainRows = m - remainRowsStartId; matMultiplyWithSingleThread(A + remainRowsStartId * p, B, C + remainRowsStartId * n, remainRows, p, n); } delete[] bA; delete[] B; delete[] bC; if(myrank == 0){ delete[] A; delete[] C; } MPI_Finalize(); // 并行结束 return 0; } void initMatrixWithRV(float *A, int rows, int cols) { srand((unsigned)time(NULL)); for(int i = 0; i < rows*cols; i++){ A[i] = (float)rand() / RAND_MAX; } } void matMultiplyWithSingleThread(float *A, float *B, float *matResult, int m, int p, int n) { for(int i=0; i<m; i++){ for(int j=0; j<n; j++){ float temp = 0; for(int k=0; k<p; k++){ temp += A[i*p+k] * B[k*n + j]; } matResult[i*n+j] = temp; } } }
编译:
$mpigxx matMultiplyWithMPI.cpp -o matMultiplyWithMPI
运行:
$mpirun -np 8 matMultiplyWithMPI 3000 2000 4000
这里假设m = 3000, p = 2000, n = 4000。另外,开启的进程数为8个。 np的个数能够大于CPU的个数。
通常来说,只有当矩阵大小大于5000的量级时,开启几十上百个进程的威力才能凸显出来。尤为是当矩阵量级达到万维以上时,串行或是少数几个进程并行的矩阵乘法将变得特别耗时。
上面的并行方案有个很大的缺陷,那就是 B 的备份数和开启的进程数一致。这对于内存不是很充裕或矩阵很大的时候,会致使灾难!例如,假设 B 是10000*10000维的,用double类型存储大概占700M左右的内存。当开启的进程数达到128个时,单是 B 的备份占据的内存开销将达到 128 * 700 M = 90G。 这将耗掉巨大的内存!
有什么改进的方案呢?
必须了解MPI的第三个特色:
c. 进程之间能够很方便地通讯,而且支持多种通讯方案。
这样,就能够把 B 也同时分布式的存储到各个进程对应的内存中,而后利用进程之间的通讯来轮换各个 B 的分块,从而达到减少内存开销的效果。固然,几乎和全部的程序同样,离不开时间与空间的trade-off。因此,这种方法虽然节省了内存,却要消耗大量的时间在进程之间的通讯上。
下面给出改进的并行方案:
(1). 将A和C按行分为np块,将B按列分为np块(B能够按列存储);
(2). 进程号为 id 的进程读取 A 和 B 的第id个分块;
(3). 循环np次:
<1>. 各个进程用各自的A、B分块求解C的分块;
<2>. 轮换B的分块(例如:id 号进程发送本身当前的B的分块到 id+1号进程)
代码以下:
/* filename: matMultiplyWithMPI_updated.cpp * parallel matrix multiplication with MPI: updated * C(m,n) = A(m,p) * B(p,n) * input: three parameters - m, p, n * @copyright: fengfu-chris */ #include<iostream> #include<mpi.h> #include<math.h> #include<stdlib.h> void initMatrixWithRV(float *A, int rows, int cols); void copyMatrix(float *A, float *A_copy, int rows, int cols); // A: m*p, B: p*n !!! note that B is stored by column first void matMultiplyWithTransposedB(float *A, float *B, float *matResult, int m, int n, int p); int main(int argc, char** argv) { int m = atoi(argv[1]); int n = atoi(argv[2]); int p = atoi(argv[3]); float *A, *B, *C; float *bA, *bB_send, *bB_recv, *bC, *bC_send; int myrank, numprocs; MPI_Status status; MPI_Init(&argc, &argv); // 并行开始 MPI_Comm_size(MPI_COMM_WORLD, &numprocs); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); int bm = m / numprocs; int bn = n / numprocs; bA = new float[bm * p]; bB_send = new float[bn * p]; bB_recv = new float[bn * p]; bC = new float[bm * bn]; bC_send = new float[bm * n]; if(myrank == 0){ A = new float[m * p]; B = new float[n * p]; C = new float[m * n]; initMatrixWithRV(A, m, p); initMatrixWithRV(B, n, p); } MPI_Barrier(MPI_COMM_WORLD); MPI_Scatter(A, bm * p, MPI_FLOAT, bA, bm * p, MPI_FLOAT, 0, MPI_COMM_WORLD); MPI_Scatter(B, bn * p, MPI_FLOAT, bB_recv, bn * p, MPI_FLOAT, 0, MPI_COMM_WORLD); int sendTo = (myrank + 1) % numprocs; int recvFrom = (myrank - 1 + numprocs) % numprocs; int circle = 0; do{ matMultiplyWithTransposedB(bA, bB_recv, bC, bm, bn, p); int blocks_col = (myrank - circle + numprocs) % numprocs; for(int i=0; i<bm; i++){ for(int j=0; j<bn; j++){ bC_send[i*n + blocks_col*bn + j] = bC[i*bn + j]; } } if(myrank % 2 == 0){ copyMatrix(bB_recv, bB_send, bn, p); MPI_Ssend(bB_send, bn*p, MPI_FLOAT, sendTo, circle, MPI_COMM_WORLD); MPI_Recv(bB_recv, bn*p, MPI_FLOAT, recvFrom, circle, MPI_COMM_WORLD, &status); }else{ MPI_Recv(bB_recv, bn*p, MPI_FLOAT, recvFrom, circle, MPI_COMM_WORLD, &status); MPI_Ssend(bB_send, bn*p, MPI_FLOAT, sendTo, circle, MPI_COMM_WORLD); copyMatrix(bB_recv, bB_send, bn, p); } circle++; }while(circle < numprocs); MPI_Barrier(MPI_COMM_WORLD); MPI_Gather(bC_send, bm * n, MPI_FLOAT, C, bm * n, MPI_FLOAT, 0, MPI_COMM_WORLD); if(myrank == 0){ int remainAStartId = bm * numprocs; int remainBStartId = bn * numprocs; for(int i=remainAStartId; i<m; i++){ for(int j=0; j<n; j++){ float temp=0; for(int k=0; k<p; k++){ temp += A[i*p + k] * B[j*p +k]; } C[i*p + j] = temp; } } for(int i=0; i<remainAStartId; i++){ for(int j=remainBStartId; j<n; j++){ float temp = 0; for(int k=0; k<p; k++){ temp += A[i*p + k] * B[j*p +k]; } C[i*p + j] = temp; } } } delete[] bA; delete[] bB_send; delete[] bB_recv; delete[] bC; delete[] bC_send; if(myrank == 0){ delete[] A; delete[] B; delete[] C; } MPI_Finalize(); // 并行结束 return 0; } void initMatrixWithRV(float *A, int rows, int cols) { srand((unsigned)time(NULL)); for(int i = 0; i < rows*cols; i++){ A[i] = (float)rand() / RAND_MAX; } } void copyMatrix(float *A, float *A_copy, int rows, int cols) { for(int i=0; i<rows*cols; i++){ A_copy[i] = A[i]; } } void matMultiplyWithTransposedB(float *A, float *B, float *matResult, int m, int p, int n) { for(int i=0; i<m; i++){ for(int j=0; j<n; j++){ float temp = 0; for(int k=0; k<p; k++){ temp += A[i*p+k] * B[j*p+k]; } matResult[i*n+j] = temp; } } }
这里最须要注意的地方就是B的轮换。 有两点须要注意:
(1) 防阻塞机制。这里采用奇偶原则:偶数号进程先发送,再接收;奇数号进程则相反。这样能够避免全部进程同时发送形成死锁的状况;
(2) 数据备份。发送和接收的信息存储在不一样的矩阵中,这样保证原来的信息不会被覆盖。
这种方法的优势是显而易见的。对于足够牛的服务器/计算机集群,开启成百上千个进程来并行彻底不是问题。
并行不易,且行且珍惜!