7 #ifndef cf3_common_PE_all_to_all_hpp
8 #define cf3_common_PE_all_to_all_hpp
72 if (in_values==out_values) {
79 MPI_CHECK_RESULT(MPI_Alltoall, (const_cast<T*>(in_values), in_n*stride, type, out_buf, in_n*stride, type, comm));
82 if (in_values==out_values) {
83 memcpy(out_values,out_buf,nproc*in_n*stride*
sizeof(
T));
105 all_to_allvm_impl(
const Communicator& comm,
const T* in_values,
const int *in_n,
const int *in_map,
T* out_values,
const int *out_n,
const int *out_map,
const int stride )
117 int *in_nstride=
new int[nproc];
118 int *out_nstride=
new int[nproc];
119 int *in_disp=
new int[nproc];
120 int *out_disp=
new int[nproc];
123 for(
int i=0; i<nproc-1; i++) {
124 in_nstride[i]=stride*in_n[i];
125 out_nstride[i]=stride*out_n[i];
126 in_disp[i+1]=in_disp[i]+in_nstride[i];
127 out_disp[i+1]=out_disp[i]+out_nstride[i];
129 in_nstride[nproc-1]=in_n[nproc-1]*stride;
130 out_nstride[nproc-1]=out_n[nproc-1]*stride;
133 const int in_sum=in_disp[nproc-1]+stride*in_n[nproc-1];
134 const int out_sum=out_disp[nproc-1]+stride*out_n[nproc-1];
140 if (stride==1) {
for(
int i=0; i<in_sum; i++) in_buf[i]=in_values[in_map[i]]; }
141 else {
for(
int i=0; i<in_sum/stride; i++) memcpy(&in_buf[stride*i],&in_values[stride*in_map[i]],stride*
sizeof(
T)); }
143 in_buf=(
T*)in_values;
148 if ((out_map!=0)||(in_values==out_values)) {
155 MPI_CHECK_RESULT(MPI_Alltoallv, (in_buf, in_nstride, in_disp, type, out_buf, out_nstride, out_disp, type, comm));
159 if (stride==1) {
for(
int i=0; i<out_sum; i++) out_values[out_map[i]]=out_buf[i]; }
160 else {
for(
int i=0; i<out_sum/stride; i++) memcpy(&out_values[stride*out_map[i]],&out_buf[stride*i],stride*
sizeof(
T)); }
162 }
else if (in_values==out_values) {
163 memcpy(out_values,out_buf,out_sum*
sizeof(
T));
168 if (in_map!=0)
delete[] in_buf;
172 delete[] out_nstride;
199 T* out_buf=out_values;
201 const int size=stride*nproc*in_n>1?stride*nproc*in_n:1;
228 cf3_assert( in_values.size() % (nproc*stride) == 0 );
229 out_values.resize(in_values.size());
230 out_values.reserve(in_values.size());
241 all_to_all(
const Communicator& comm,
const T* in_values,
const int *in_n,
const int *in_map,
T* out_values,
int *out_n,
const int *out_map,
const int stride=1);
259 return all_to_all(comm,in_values,in_n,0,out_values,out_n,0,stride);
267 all_to_all(
const Communicator& comm,
const std::vector<T>& in_values,
const std::vector<int>& in_n,
const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n,
const std::vector<int>& out_map,
const int stride=1);
283 all_to_all(
const Communicator& comm,
const std::vector<T>& in_values,
const std::vector<int>& in_n, std::vector<T>& out_values, std::vector<int>& out_n,
const int stride=1)
286 std::vector<int> in_map(0);
287 std::vector<int> out_map(0);
288 if (&in_values[0]==&out_values[0])
290 std::vector<T> out_tmp(0);
291 all_to_all(comm,in_values,in_n,in_map,out_tmp,out_n,out_map,stride);
292 out_values.assign(out_tmp.begin(),out_tmp.end());
296 all_to_all(comm,in_values,in_n,in_map,out_values,out_n,out_map,stride);
318 all_to_all(
const Communicator& comm,
const T* in_values,
const int *in_n,
const int *in_map,
T* out_values,
int *out_n,
const int *out_map,
const int stride)
326 for (
int i=0; i<nproc; i++) out_sum+=out_n[i];
327 if (out_sum==-nproc) {
328 if (out_map!=0)
throw cf3::common::ParallelError(
FromHere(),
"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
331 for (
int i=0; i<nproc; i++) out_sum+=out_n[i];
335 T* out_buf=out_values;
339 for (
int i=0; i<out_sum; i++) out_sum_tmp=out_map[i]>out_sum_tmp?out_map[i]:out_sum_tmp;
340 out_sum=out_sum_tmp+1;
369 all_to_all(
const Communicator& comm,
const std::vector<T>& in_values,
const std::vector<int>& in_n,
const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n,
const std::vector<int>& out_map,
const int stride)
375 if ((
int)out_n.size()!=nproc)
cf3::common::BadValue(
FromHere(),
"Size of vector for number of items to be received does not match to number of processes.");
384 if (out_sum == -nproc){
385 if (out_map.size()!=0)
throw cf3::common::ParallelError(
FromHere(),
"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
392 if (out_values.size() == 0 ){
393 if (out_map.size()!=0) {
396 if (out_sum!=0) out_sum++;
398 out_values.resize(stride*out_sum);
399 out_values.reserve(stride*out_sum);
403 detail::all_to_allvm_impl(comm, (
T*)(&in_values[0]), &in_n[0], (in_map.empty() ?
nullptr : &in_map[0]), (
T*)(&out_values[0]), &out_n[0], (out_map.empty() ?
nullptr : &out_map[0]), stride);
408 template <
typename T>
411 std::vector<int> send_strides(send.size());
412 std::vector<int> send_displs(send.size());
413 for (
Uint i=0; i<send.size(); ++i)
414 send_strides[i] = send[i].size();
417 for (
Uint i=1; i<send.size(); ++i)
418 send_displs[i] = send_displs[i-1] + send_strides[i-1];
420 std::vector<T> send_linear(send_displs.back()+send_strides.back());
421 for (
Uint i=0; i<send.size(); ++i)
422 for (
Uint j=0; j<send[i].size(); ++j)
423 send_linear[send_displs[i]+j] = send[i][j];
425 std::vector<int> recv_strides(send.size());
426 std::vector<int> recv_displs(send.size());
429 for (
Uint i=1; i<send.size(); ++i)
430 recv_displs[i] = recv_displs[i-1] + recv_strides[i-1];
432 std::vector<T> recv_linear(recv_displs.back()+recv_strides.back());
433 MPI_CHECK_RESULT(MPI_Alltoallv, (&send_linear[0], &send_strides[0], &send_displs[0], PE::get_mpi_datatype<T>(), &recv_linear[0], &recv_strides[0], &recv_displs[0], get_mpi_datatype<T>(), comm));
435 recv.resize(recv_strides.size());
436 for (
Uint i=0; i<recv_strides.size(); ++i)
438 recv[i].resize(recv_strides[i]);
439 for (
Uint j=0; j<recv_strides[i]; ++j)
441 recv[i][j]=recv_linear[recv_displs[i]+j];
454 #endif // cf3_common_PE_all_to_all_hpp
void all_to_allc_impl(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int stride)
MPI_Datatype Datatype
datatype
Datatype get_mpi_datatype(const T &ref_of_type)
ACCESS AND REGISTRATION MECHANISM.
#define boost_foreach
lowercase version of BOOST_FOREACH
T * all_to_all(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int stride=1)
Top-level namespace for coolfluid.
void all_to_allvm_impl(const Communicator &comm, const T *in_values, const int *in_n, const int *in_map, T *out_values, const int *out_n, const int *out_map, const int stride)
MPI_Comm Communicator
communicator
unsigned int Uint
typedef for unsigned int
#define MPI_CHECK_RESULT(MPIFunc, Args)
Macro for checking return values of any mpi calls and throws exception on error.