for(int i = 0; i < N-1; ++i) for(int j = i + 1; j < N; ++j) ...
然而,它并没有在互联网上寻找一个“缓存无关的并行实现”,我在下面写下并转载.我在这里链接了一个帖子(使用英特尔TBB),详细描述了这个算法.
我尝试使用OpenMP任务来执行相同的操作,并且它总是比串行对应物运行得慢(只是在没有-fopenmp的情况下编译).我用g -Wall -std = c 11 -O3 test.cpp -o test编译它.在有或没有-O3的情况下观察到相同的情况;串口总是更快.
为了添加更多信息,在我的实际应用程序中,通常有几百到几千个元素(下面的例子中的变量n),我需要多次以这种成对的方式循环.数百万次.我下面的尝试试图模拟(虽然我只尝试循环10-100k次).
我使用时间./test非常粗略地定时,因为它有很大的不同.是的,我知道我的例子编写得很糟糕,并且我在我的例子中包含了创建向量所需的时间.但串行的时间给了我大约30秒和一分钟的时间,所以我认为我还不需要做更严格的事情.
我的问题是:为什么序列会做得更好?我在OpenMP中做错了吗?如何正确纠正我的错误?我误用了任务吗?我有一种感觉,递归任务与它有关,我尝试将’OMP_THREAD_LIMIT’设置为4,但它没有产生实质性的区别.有没有更好的方法使用OpenMP实现这一点?
注意:我的问题是具体询问如何修复此特定实现,以便它可以并行正常工作.虽然如果有人知道这个问题的替代解决方案及其在OpenMP中的正确实现,我也对此持开放态度.
提前致谢.
#include <vector> #include <iostream> std::vector<std::vector<double>> testme; void rect(int i0,int i1,int j0,int j1) { int di = i1 - j0; int dj = j1 - j0; constexpr int threshold = 16; if(di > threshold && dj > threshold) { int im = i0 + di/2; int jm = j0 + dj/2; #pragma omp task { rect(i0,im,j0,jm); } rect(im,i1,jm,j1); #pragma omp taskwait #pragma omp task { rect(i0,j1); } rect(im,jm); #pragma omp taskwait } else { for(int i = i0; i < i1; ++i) for(int j = j0; j < j1; ++j) { testme[i][j] = 1; testme[j][i] = 1; } } } void triangle(int n0,int n1) { int dn = n1 - n0; if(dn > 1) { int nm = n0 + dn/2; #pragma omp task { triangle(n0,nm); } triangle(nm,n1); #pragma omp taskwait rect(n0,nm,n1); } } void calc_force(int nbodies) { #pragma omp parallel num_threads(4) #pragma omp single triangle(0,nbodies); } int main() { int n = 400; for(int i = 0; i < n; ++i) { std::vector<double> temp; for(int j = 0; j < n; ++j) temp.push_back(0); testme.push_back(temp); } // Repeat a bunch of times. for(int i = 0; i < 100000; ++i) calc_force(n); return 0; }
解决方法
所以我决定试一试我想到的一些方法.但是为了使练习变得合理,重要的是为计算“对称计算”做了一些实际的工作,否则,只是迭代所有元素而不考虑对称方面肯定是最好的选择.
所以我写了一个force()函数,根据坐标计算与两个物体之间的引力相互作用松散相关的东西.
然后我测试了4种不同的方法来迭代粒子:
>一种天真的三角形方法,例如你提出的方法.由于它是固有的负载不平衡方面,因此它与schedule(auto)子句并行化,以允许运行时库采取它认为最佳性能的决策.
>三角域的“巧妙”遍历,包括在j方向上将其切成两半以允许使用2个常规循环.基本上,它对应于这样的事情:
. / |
/ | __ __
/ | => | // |
/ ___ | | // ____ |
>直接的矩形方法,只是忽略了对称性.注意,这个,就像你的递归方法一样,保证对force数组的非并发访问.
>一种线性化方法,包括预先计算用于访问三角域的i和j索引的顺序,以及迭代包含这些索引的向量.
由于力以[i] = fij积累力的向量; force [j] – = fij;方法将为非并行化索引中的更新生成竞争条件(j,例如在方法#1中),我创建了一个本地预线程强制数组,在进入并行区域时初始化为0.然后在该“私有”阵列上预先进行计算,并且在并行区域退出时,使用关键构造在全局力阵列上累积各个贡献.这是OpenMP中数组的典型缩减模式.
这是完整的代码:
#include <iostream> #include <vector> #include <cstdlib> #include <cmath> #include <omp.h> std::vector< std::vector<double> > l_f; std::vector<double> x,y,f; std::vector<int> vi,vj; int numth,tid; #pragma omp threadprivate( tid ) double force( int i,int j ) { double dx = x[i] - x[j]; double dy = y[i] - y[j]; double dist2 = dx*dx + dy*dy; return dist2 * std::sqrt( dist2 ); } void loop1( int n ) { #pragma omp parallel { for ( int i = 0; i < n; i++ ) { l_f[tid][i] = 0; } #pragma omp for schedule(auto) nowait for ( int i = 0; i < n-1; i++ ) { for ( int j = i+1; j < n; j++ ) { double fij = force( i,j ); l_f[tid][i] += fij; l_f[tid][j] -= fij; } } #pragma omp critical for ( int i = 0; i < n; i++ ) { f[i] += l_f[tid][i]; } } } void loop2( int n ) { int m = n/2-1+n%2; #pragma omp parallel { for ( int i = 0; i < n; i++ ) { l_f[tid][i] = 0; } #pragma omp for schedule(static) nowait for ( int i = 0; i < n; i++ ) { for ( int j = 0; j < m; j++ ) { int ii,jj; if ( j < i ) { ii = n-1-i; jj = n-1-j; } else { ii = i; jj = j+1; } double fij = force( ii,jj ); l_f[tid][ii] += fij; l_f[tid][jj] -= fij; } } if ( n%2 == 0 ) { #pragma omp for schedule(static) nowait for ( int i = 0; i < n/2; i++ ) { double fij = force( i,n/2 ); l_f[tid][i] += fij; l_f[tid][n/2] -= fij; } } #pragma omp critical for ( int i = 0; i < n; i++ ) { f[i] += l_f[tid][i]; } } } void loop3( int n ) { #pragma omp parallel for schedule(static) for ( int i = 0; i < n; i++ ) { for ( int j = 0; j < n; j++ ) { if ( i < j ) { f[i] += force( i,j ); } else if ( i > j ) { f[i] -= force( i,j ); } } } } void loop4( int n ) { #pragma omp parallel { for ( int i = 0; i < n; i++ ) { l_f[tid][i] = 0; } #pragma omp for schedule(static) nowait for ( int k = 0; k < vi.size(); k++ ) { int i = vi[k]; int j = vj[k]; double fij = force( i,j ); l_f[tid][i] += fij; l_f[tid][j] -= fij; } #pragma omp critical for ( int i = 0; i < n; i++ ) { f[i] += l_f[tid][i]; } } } int main( int argc,char *argv[] ) { if ( argc != 2 ) { std::cout << "need the dim as argument\n"; return 1; } int n = std::atoi( argv[1] ); // initialise data f.resize( n ); x.resize( n ); y.resize( n ); for ( int i = 0; i < n; ++i ) { x[i] = y[i] = i; f[i] = 0; } // initialising linearised index vectors for ( int i = 0; i < n-1; i++ ) { for ( int j = i+1; j < n; j++ ) { vi.push_back( i ); vj.push_back( j ); } } // initialising the local forces vectors #pragma omp parallel { tid = omp_get_thread_num(); #pragma master numth = omp_get_num_threads(); } l_f.resize( numth ); for ( int i = 0; i < numth; i++ ) { l_f[i].resize( n ); } // testing all methods one after the other,with a warm up before each int lmax = 10000; loop1( n ); double tbeg = omp_get_wtime(); for ( int l = 0; l < lmax; l++ ) { loop1( n ); } double tend = omp_get_wtime(); std::cout << "Time for triangular loop is " << tend-tbeg << "s\n"; loop2( n ); tbeg = omp_get_wtime(); for ( int l = 0; l < lmax; l++ ) { loop2( n ); } tend = omp_get_wtime(); std::cout << "Time for mangled rectangular loop is " << tend-tbeg << "s\n"; loop3( n ); tbeg = omp_get_wtime(); for ( int l = 0; l < lmax; l++ ) { loop3( n ); } tend = omp_get_wtime(); std::cout << "Time for naive rectangular loop is " << tend-tbeg << "s\n"; loop4( n ); tbeg = omp_get_wtime(); for ( int l = 0; l < lmax; l++ ) { loop4( n ); } tend = omp_get_wtime(); std::cout << "Time for linearised loop is " << tend-tbeg << "s\n"; int ret = f[n-1]; return ret; }
现在,评估相对性能和可伸缩性变得简单.
在第一次非定时预热迭代之后,所有方法都在循环上定时.
编译:g -O3 -mtune = native -march = native -fopenmp tbf.cc -o tbf
8核IvyBridge cpu上的结果:
> OMP_NUM_THREADS=1 numactl -N 0 -m 0 ./tbf 500 Time for triangular loop is 9.21198s Time for mangled rectangular loop is 10.1316s Time for naive rectangular loop is 15.9408s Time for linearised loop is 10.6449s > OMP_NUM_THREADS=2 numactl -N 0 -m 0 ./tbf 500 Time for triangular loop is 6.84671s Time for mangled rectangular loop is 5.13731s Time for naive rectangular loop is 8.09542s Time for linearised loop is 5.4654s > OMP_NUM_THREADS=4 numactl -N 0 -m 0 ./tbf 500 Time for triangular loop is 4.03016s Time for mangled rectangular loop is 2.90809s Time for naive rectangular loop is 4.45373s Time for linearised loop is 2.7733s > OMP_NUM_THREADS=8 numactl -N 0 -m 0 ./tbf 500 Time for triangular loop is 2.31051s Time for mangled rectangular loop is 2.05854s Time for naive rectangular loop is 3.03463s Time for linearised loop is 1.7106s
因此,在这种情况下,方法#4似乎是具有良好性能和非常好的可伸缩性的最佳选择.请注意,由于schedule(auto)指令中的良好负载平衡作业,直接的三角形方法并不算太糟糕.但最终,我鼓励你用你自己的工作量进行测试……
作为参考,您的初始代码(为计算force()而修改的方式与其他测试完全相同,包括使用的OpenMP线程的数量,但不需要本地强制数组和最终减少,坦克到递归方法)给出这个:
> OMP_NUM_THREADS=1 numactl -N 0 -m 0 ./recursive 500 Time for recursive method is 9.32888s > OMP_NUM_THREADS=2 numactl -N 0 -m 0 ./recursive 500 Time for recursive method is 9.48718s > OMP_NUM_THREADS=4 numactl -N 0 -m 0 ./recursive 500 Time for recursive method is 10.962s > OMP_NUM_THREADS=8 numactl -N 0 -m 0 ./recursive 500 Time for recursive method is 13.2786