7 #ifndef cf3_common_PE_all_gather_hpp
8 #define cf3_common_PE_all_gather_hpp
70 T* out_buf=out_values;
71 if (in_values==out_values) {
78 int *displs=
new int[nproc];
79 for (
int i=0; i<nproc; i++) displs[i]=i*in_n*stride;
80 int *counts=
new int[nproc];
81 for (
int i=0; i<nproc; i++) counts[i]=in_n*stride;
82 MPI_CHECK_RESULT(MPI_Allgatherv, (const_cast<T*>(in_values), in_n*stride, type, out_buf, counts, displs, type, comm ));
87 if (in_values==out_values) {
88 memcpy(out_values,out_buf,nproc*in_n*stride*
sizeof(
T));
110 all_gathervm_impl(
const Communicator& comm,
const T* in_values,
const int in_n,
const int *in_map,
T* out_values,
const int *out_n,
const int *out_map,
const int stride )
122 int *out_nstride=
new int[nproc];
123 int *out_disp=
new int[nproc];
125 for(
int i=0; i<nproc-1; i++) {
126 out_nstride[i]=stride*out_n[i];
127 out_disp[i+1]=out_disp[i]+out_nstride[i];
129 out_nstride[nproc-1]=out_n[nproc-1]*stride;
132 const int in_sum=stride*in_n;
133 const int out_sum=out_disp[nproc-1]+stride*out_n[nproc-1];
136 T *in_buf=(
T*)in_values;
139 if (stride==1) {
for(
int i=0; i<in_sum; i++) in_buf[i]=in_values[in_map[i]]; }
140 else {
for(
int i=0; i<in_sum/stride; i++) memcpy(&in_buf[stride*i],&in_values[stride*in_map[i]],stride*
sizeof(
T)); }
144 T *out_buf=out_values;
145 if ((out_map!=0)||(in_values==out_values)) {
150 MPI_CHECK_RESULT(MPI_Allgatherv, (in_buf, in_sum, type, out_buf, out_nstride, out_disp, type, comm));
154 if (stride==1) {
for(
int i=0; i<out_sum; i++) out_values[out_map[i]]=out_buf[i]; }
155 else {
for(
int i=0; i<out_sum/stride; i++) memcpy(&out_values[stride*out_map[i]],&out_buf[stride*i],stride*
sizeof(
T)); }
157 }
else if (in_values==out_values) {
158 memcpy(out_values,out_buf,out_sum*
sizeof(
T));
163 if (in_map!=0)
delete[] in_buf;
165 delete[] out_nstride;
192 T* out_buf=out_values;
194 const int size=stride*nproc*in_n>1?stride*nproc*in_n:1;
221 cf3_assert( in_values.size() % (stride) == 0 );
222 int in_n=(int)in_values.size();
223 out_values.resize(in_n*nproc);
224 out_values.reserve(in_n*nproc);
248 out_values.resize(nproc);
249 out_values.reserve(nproc);
260 all_gather(
const Communicator& comm,
const T* in_values,
const int in_n,
const int *in_map,
T* out_values,
int *out_n,
const int *out_map,
const int stride=1);
278 return all_gather(comm,in_values,in_n,0,out_values,out_n,0,stride);
286 all_gather(
const Communicator& comm,
const std::vector<T>& in_values,
const int in_n,
const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n,
const std::vector<int>& out_map,
const int stride=1);
302 all_gather(
const Communicator& comm,
const std::vector<T>& in_values,
const int in_n, std::vector<T>& out_values, std::vector<int>& out_n,
const int stride=1)
305 std::vector<int> in_map(0);
306 std::vector<int> out_map(0);
307 if (&in_values[0]==&out_values[0])
309 std::vector<T> out_tmp(0);
310 all_gather(comm,in_values,in_n,in_map,out_tmp,out_n,out_map,stride);
311 out_values.assign(out_tmp.begin(),out_tmp.end());
315 all_gather(comm,in_values,in_n,in_map,out_values,out_n,out_map,stride);
337 all_gather(
const Communicator& comm,
const T* in_values,
const int in_n,
const int *in_map,
T* out_values,
int *out_n,
const int *out_map,
const int stride)
345 for (
int i=0; i<nproc; i++) out_sum+=out_n[i];
346 if (out_sum==-nproc) {
347 if (out_map!=0)
throw cf3::common::ParallelError(
FromHere(),
"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
350 for (
int i=0; i<nproc; i++) out_sum+=out_n[i];
354 T* out_buf=out_values;
358 for (
int i=0; i<out_sum; i++) out_sum_tmp=out_map[i]>out_sum_tmp?out_map[i]:out_sum_tmp;
359 out_sum=out_sum_tmp+1;
388 all_gather(
const Communicator& comm,
const std::vector<T>& in_values,
const int in_n,
const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n,
const std::vector<int>& out_map,
const int stride)
393 if ((
int)out_n.size()!=nproc)
cf3::common::BadValue(
FromHere(),
"Size of vector for number of items to be received does not match to number of processes.");
400 if (out_sum == -nproc){
401 if (out_map.size()!=0)
throw cf3::common::ParallelError(
FromHere(),
"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
408 if (out_values.size() == 0 ){
409 if (out_map.size()!=0) {
412 if (out_sum!=0) out_sum++;
414 out_values.resize(stride*out_sum);
415 out_values.reserve(stride*out_sum);
419 detail::all_gathervm_impl(comm, (
T*)(&in_values[0]), in_n, (in_map.empty() ?
nullptr : &in_map[0]), (
T*)(&out_values[0]), &out_n[0], (out_map.empty() ?
nullptr : &out_map[0]), stride);
422 template <
typename T>
425 std::vector<int> strides;
427 std::vector<int> displs(strides.size());
430 int sum_strides = strides[0];
432 for (
Uint i=1; i<strides.size(); ++i)
434 displs[i] = displs[i-1] + strides[i-1];
435 sum_strides += strides[i];
437 std::vector<T> recv_linear(sum_strides);
438 MPI_CHECK_RESULT(MPI_Allgatherv, ((
void*)&send[0], (
int)send.size(), get_mpi_datatype<T>(), &recv_linear[0], &strides[0], &displs[0], get_mpi_datatype<T>(), comm));
439 recv.resize(strides.size());
440 for (
Uint i=0; i<strides.size(); ++i)
442 recv[i].resize(strides[i]);
443 for (
Uint j=0; j<strides[i]; ++j)
445 recv[i][j]=recv_linear[displs[i]+j];
463 #endif // cf3_common_PE_all_gather_hpp
void all_gathervm_impl(const Communicator &comm, const T *in_values, const int in_n, const int *in_map, T *out_values, const int *out_n, const int *out_map, const int stride)
MPI_Datatype Datatype
datatype
Datatype get_mpi_datatype(const T &ref_of_type)
ACCESS AND REGISTRATION MECHANISM.
#define boost_foreach
lowercase version of BOOST_FOREACH
T * all_gather(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int stride=1)
void all_gatherc_impl(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int stride)
Top-level namespace for coolfluid.
MPI_Comm Communicator
communicator
unsigned int Uint
typedef for unsigned int
#define MPI_CHECK_RESULT(MPIFunc, Args)
Macro for checking return values of any mpi calls and throws exception on error.