COOLFluiD  Release kernel
COOLFluiD is a Collaborative Simulation Environment (CSE) focused on complex MultiPhysics simulations.
all_to_all.hpp
Go to the documentation of this file.
1 // Copyright (C) 2010-2013 von Karman Institute for Fluid Dynamics, Belgium
2 //
3 // This software is distributed under the terms of the
4 // GNU Lesser General Public License version 3 (LGPLv3).
5 // See doc/lgpl.txt and doc/gpl.txt for the license text.
6 
7 #ifndef cf3_common_PE_all_to_all_hpp
8 #define cf3_common_PE_all_to_all_hpp
9 
11 
12 #include "common/Assertions.hpp"
13 #include "common/Foreach.hpp"
15 
16 #include "common/PE/types.hpp"
17 #include "common/PE/datatype.hpp"
18 
19 // #include "common/PE/debug.hpp"
20 
22 
36 
38 namespace cf3 {
39  namespace common {
40  namespace PE {
41 
43 
44 namespace detail {
45 
47 
58  template<typename T>
59  inline void
60  all_to_allc_impl(const Communicator& comm, const T* in_values, const int in_n, T* out_values, const int stride )
61  {
62  // get data type and number of processors
63  Datatype type = PE::get_mpi_datatype(*in_values);
64  int nproc;
65  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
66 
67  // if stride is greater than one
68  cf3_assert( stride>0 );
69 
70  // set up out_buf
71  T* out_buf=0;
72  if (in_values==out_values) {
73  if ( (out_buf=new T[nproc*in_n*stride+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
74  } else {
75  out_buf=out_values;
76  }
77 
78  // do the communication
79  MPI_CHECK_RESULT(MPI_Alltoall, (const_cast<T*>(in_values), in_n*stride, type, out_buf, in_n*stride, type, comm));
80 
81  // deal with out_buf
82  if (in_values==out_values) {
83  memcpy(out_values,out_buf,nproc*in_n*stride*sizeof(T));
84  delete[] out_buf;
85  }
86  }
87 
89 
103  template<typename T>
104  inline void
105  all_to_allvm_impl(const Communicator& comm, const T* in_values, const int *in_n, const int *in_map, T* out_values, const int *out_n, const int *out_map, const int stride )
106  {
107  // get data type and number of processors
108  Datatype type = PE::get_mpi_datatype(*in_values);
109  int nproc;
110  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
111 
112  // if stride is smaller than one and unsupported functionality
113  cf3_assert( stride>0 );
114 
115  // compute displacements both on send an receive side
116  // also compute stride-multiplied send and receive counts
117  int *in_nstride=new int[nproc];
118  int *out_nstride=new int[nproc];
119  int *in_disp=new int[nproc];
120  int *out_disp=new int[nproc];
121  in_disp[0]=0;
122  out_disp[0]=0;
123  for(int i=0; i<nproc-1; i++) {
124  in_nstride[i]=stride*in_n[i];
125  out_nstride[i]=stride*out_n[i];
126  in_disp[i+1]=in_disp[i]+in_nstride[i];
127  out_disp[i+1]=out_disp[i]+out_nstride[i];
128  }
129  in_nstride[nproc-1]=in_n[nproc-1]*stride;
130  out_nstride[nproc-1]=out_n[nproc-1]*stride;
131 
132  // compute total number of send and receive items
133  const int in_sum=in_disp[nproc-1]+stride*in_n[nproc-1];
134  const int out_sum=out_disp[nproc-1]+stride*out_n[nproc-1];
135 
136  // set up in_buf
137  T *in_buf=0;
138  if (in_map!=0) {
139  if ( (in_buf=new T[in_sum+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
140  if (stride==1) { for(int i=0; i<in_sum; i++) in_buf[i]=in_values[in_map[i]]; }
141  else { for(int i=0; i<in_sum/stride; i++) memcpy(&in_buf[stride*i],&in_values[stride*in_map[i]],stride*sizeof(T)); }
142  } else {
143  in_buf=(T*)in_values;
144  }
145 
146  // set up out_buf
147  T *out_buf=0;
148  if ((out_map!=0)||(in_values==out_values)) {
149  if ( (out_buf=new T[out_sum+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
150  } else {
151  out_buf=out_values;
152  }
153 
154  // do the communication
155  MPI_CHECK_RESULT(MPI_Alltoallv, (in_buf, in_nstride, in_disp, type, out_buf, out_nstride, out_disp, type, comm));
156 
157  // re-populate out_values
158  if (out_map!=0) {
159  if (stride==1) { for(int i=0; i<out_sum; i++) out_values[out_map[i]]=out_buf[i]; }
160  else { for(int i=0; i<out_sum/stride; i++) memcpy(&out_values[stride*out_map[i]],&out_buf[stride*i],stride*sizeof(T)); }
161  delete[] out_buf;
162  } else if (in_values==out_values) {
163  memcpy(out_values,out_buf,out_sum*sizeof(T));
164  delete[] out_buf;
165  }
166 
167  // free internal memory
168  if (in_map!=0) delete[] in_buf;
169  delete[] in_disp;
170  delete[] out_disp;
171  delete[] in_nstride;
172  delete[] out_nstride;
173  }
174 
176 
177 } // end namespace detail
178 
180 
190 template<typename T>
191 inline T*
192 all_to_all(const Communicator& comm, const T* in_values, const int in_n, T* out_values, const int stride=1)
193 {
194  // get nproc
195  int nproc;
196  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
197 
198  // allocate out_buf if incoming pointer is null
199  T* out_buf=out_values;
200  if (out_values==0) {
201  const int size=stride*nproc*in_n>1?stride*nproc*in_n:1;
202  if ( (out_buf=new T[size]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
203  }
204 
205  // call c_impl
206  detail::all_to_allc_impl(comm, in_values, in_n, out_buf, stride);
207  return out_buf;
208 }
209 
211 
219 template<typename T>
220 inline void
221 all_to_all(const Communicator& comm, const std::vector<T>& in_values, std::vector<T>& out_values, const int stride=1)
222 {
223  // get number of processors
224  int nproc;
225  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
226 
227  // set out_values's sizes
228  cf3_assert( in_values.size() % (nproc*stride) == 0 );
229  out_values.resize(in_values.size());
230  out_values.reserve(in_values.size());
231 
232  // call c_impl
233  detail::all_to_allc_impl(comm, (T*)(&in_values[0]), in_values.size()/(nproc*stride), (T*)(&out_values[0]), stride);
234 }
235 
237 
238 //needs a forward
239 template<typename T>
240 inline T*
241 all_to_all(const Communicator& comm, const T* in_values, const int *in_n, const int *in_map, T* out_values, int *out_n, const int *out_map, const int stride=1);
242 
254 template<typename T>
255 inline T*
256 all_to_all(const Communicator& comm, const T* in_values, const int *in_n, T* out_values, int *out_n, const int stride=1)
257 {
258  // call mapped variable all_to_all
259  return all_to_all(comm,in_values,in_n,0,out_values,out_n,0,stride);
260 }
261 
263 
264 //needs a forward
265 template<typename T>
266 inline void
267 all_to_all(const Communicator& comm, const std::vector<T>& in_values, const std::vector<int>& in_n, const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n, const std::vector<int>& out_map, const int stride=1);
268 
281 template<typename T>
282 inline void
283 all_to_all(const Communicator& comm, const std::vector<T>& in_values, const std::vector<int>& in_n, std::vector<T>& out_values, std::vector<int>& out_n, const int stride=1)
284 {
285  // call mapped variable all_to_all
286  std::vector<int> in_map(0);
287  std::vector<int> out_map(0);
288  if (&in_values[0]==&out_values[0])
289  {
290  std::vector<T> out_tmp(0);
291  all_to_all(comm,in_values,in_n,in_map,out_tmp,out_n,out_map,stride);
292  out_values.assign(out_tmp.begin(),out_tmp.end());
293  }
294  else
295  {
296  all_to_all(comm,in_values,in_n,in_map,out_values,out_n,out_map,stride);
297  }
298 }
299 
301 
316 template<typename T>
317 inline T*
318 all_to_all(const Communicator& comm, const T* in_values, const int *in_n, const int *in_map, T* out_values, int *out_n, const int *out_map, const int stride)
319 {
320  // number of processes
321  int nproc;
322  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
323 
324  // if out_n consist of -1s then communicate for number of receives
325  int out_sum=0;
326  for (int i=0; i<nproc; i++) out_sum+=out_n[i];
327  if (out_sum==-nproc) {
328  if (out_map!=0) throw cf3::common::ParallelError(FromHere(),"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
329  detail::all_to_allc_impl(comm,in_n,1,out_n,1);
330  out_sum=0;
331  for (int i=0; i<nproc; i++) out_sum+=out_n[i];
332  }
333 
334  // allocate out_buf if incoming pointer is null
335  T* out_buf=out_values;
336  if (out_values==0) {
337  if (out_map!=0){
338  int out_sum_tmp=0;
339  for (int i=0; i<out_sum; i++) out_sum_tmp=out_map[i]>out_sum_tmp?out_map[i]:out_sum_tmp;
340  out_sum=out_sum_tmp+1;
341  }
342  if ( (out_buf=new T[stride*out_sum]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
343  }
344 
345  // call vm_impl
346  detail::all_to_allvm_impl(comm, in_values, in_n, in_map, out_buf, out_n, out_map, stride);
347  return out_buf;
348 }
349 
351 
367 template<typename T>
368 inline void
369 all_to_all(const Communicator& comm, const std::vector<T>& in_values, const std::vector<int>& in_n, const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n, const std::vector<int>& out_map, const int stride)
370 {
371  // number of processes and checking in_n and out_n (out_n deliberately throws exception because the vector can arrive from arbitrary previous usage)
372  int nproc;
373  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
374  cf3_assert( (int)in_n.size() == nproc );
375  if ((int)out_n.size()!=nproc) cf3::common::BadValue(FromHere(),"Size of vector for number of items to be received does not match to number of processes.");
376 
377  // compute number of send and receive
378  int in_sum=0;
379  int out_sum=0;
380  boost_foreach( int i, in_n ) in_sum+=i;
381  boost_foreach( int i, out_n ) out_sum+=i;
382 
383  // if necessary, do communication for out_n
384  if (out_sum == -nproc){
385  if (out_map.size()!=0) throw cf3::common::ParallelError(FromHere(),"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
386  detail::all_to_allc_impl(comm,&in_n[0],1,&out_n[0],1);
387  out_sum=0;
388  boost_foreach( int & i, out_n ) out_sum+=i;
389  }
390 
391  // resize out_values if vector size is zero
392  if (out_values.size() == 0 ){
393  if (out_map.size()!=0) {
394  out_sum=0;
395  boost_foreach( int i, out_map ) out_sum=i>out_sum?i:out_sum;
396  if (out_sum!=0) out_sum++;
397  }
398  out_values.resize(stride*out_sum);
399  out_values.reserve(stride*out_sum);
400  }
401 
402  // call vm_impl
403  detail::all_to_allvm_impl(comm, (T*)(&in_values[0]), &in_n[0], (in_map.empty() ? nullptr : &in_map[0]), (T*)(&out_values[0]), &out_n[0], (out_map.empty() ? nullptr : &out_map[0]), stride);
404 }
405 
407 
408 template <typename T>
409 void all_to_all(const Communicator& comm, const std::vector<std::vector<T> >& send, std::vector<std::vector<T> >& recv)
410 {
411  std::vector<int> send_strides(send.size());
412  std::vector<int> send_displs(send.size());
413  for (Uint i=0; i<send.size(); ++i)
414  send_strides[i] = send[i].size();
415 
416  send_displs[0] = 0;
417  for (Uint i=1; i<send.size(); ++i)
418  send_displs[i] = send_displs[i-1] + send_strides[i-1];
419 
420  std::vector<T> send_linear(send_displs.back()+send_strides.back());
421  for (Uint i=0; i<send.size(); ++i)
422  for (Uint j=0; j<send[i].size(); ++j)
423  send_linear[send_displs[i]+j] = send[i][j];
424 
425  std::vector<int> recv_strides(send.size());
426  std::vector<int> recv_displs(send.size());
427  all_to_all(comm,send_strides,recv_strides);
428  recv_displs[0] = 0;
429  for (Uint i=1; i<send.size(); ++i)
430  recv_displs[i] = recv_displs[i-1] + recv_strides[i-1];
431 
432  std::vector<T> recv_linear(recv_displs.back()+recv_strides.back());
433  MPI_CHECK_RESULT(MPI_Alltoallv, (&send_linear[0], &send_strides[0], &send_displs[0], PE::get_mpi_datatype<T>(), &recv_linear[0], &recv_strides[0], &recv_displs[0], get_mpi_datatype<T>(), comm));
434 
435  recv.resize(recv_strides.size());
436  for (Uint i=0; i<recv_strides.size(); ++i)
437  {
438  recv[i].resize(recv_strides[i]);
439  for (Uint j=0; j<recv_strides[i]; ++j)
440  {
441  recv[i][j]=recv_linear[recv_displs[i]+j];
442  }
443  }
444 }
445 
447 
448 } // namespace PE
449 } // namespace common
450 } // namespace cf3
451 
453 
454 #endif // cf3_common_PE_all_to_all_hpp
void all_to_allc_impl(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int stride)
Definition: all_to_all.hpp:60
#define cf3_assert(a)
Definition: Assertions.hpp:93
MPI_Datatype Datatype
datatype
Definition: types.hpp:47
Datatype get_mpi_datatype(const T &ref_of_type)
ACCESS AND REGISTRATION MECHANISM.
Definition: datatype.hpp:49
#define boost_foreach
lowercase version of BOOST_FOREACH
Definition: Foreach.hpp:16
T * all_to_all(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int stride=1)
Definition: all_to_all.hpp:192
Top-level namespace for coolfluid.
Definition: Action.cpp:18
void all_to_allvm_impl(const Communicator &comm, const T *in_values, const int *in_n, const int *in_map, T *out_values, const int *out_n, const int *out_map, const int stride)
Definition: all_to_all.hpp:105
MPI_Comm Communicator
communicator
Definition: types.hpp:41
unsigned int Uint
typedef for unsigned int
Definition: CF.hpp:90
#define MPI_CHECK_RESULT(MPIFunc, Args)
Macro for checking return values of any mpi calls and throws exception on error.
Definition: types.hpp:20
#define FromHere()
Send comments to:
COOLFluiD Web Admin