COOLFluiD  Release kernel
COOLFluiD is a Collaborative Simulation Environment (CSE) focused on complex MultiPhysics simulations.
all_gather.hpp
Go to the documentation of this file.
1 // Copyright (C) 2010-2013 von Karman Institute for Fluid Dynamics, Belgium
2 //
3 // This software is distributed under the terms of the
4 // GNU Lesser General Public License version 3 (LGPLv3).
5 // See doc/lgpl.txt and doc/gpl.txt for the license text.
6 
7 #ifndef cf3_common_PE_all_gather_hpp
8 #define cf3_common_PE_all_gather_hpp
9 
11 
12 #include "common/Foreach.hpp"
14 #include "common/CodeLocation.hpp"
15 #include "common/PE/types.hpp"
16 #include "common/PE/datatype.hpp"
17 
18 // #include "common/PE/debug.hpp" // for debugging mpi
19 
21 
35 
37 namespace cf3 {
38  namespace common {
39  namespace PE {
40 
42 
43 namespace detail {
44 
46 
57  template<typename T>
58  inline void
59  all_gatherc_impl(const Communicator& comm, const T* in_values, const int in_n, T* out_values, const int stride )
60  {
61  // get data type and number of processors
62  Datatype type = PE::get_mpi_datatype(*in_values);
63  int nproc;
64  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
65 
66  // if stride is greater than one
67  cf3_assert( stride>0 );
68 
69  // set up out_buf
70  T* out_buf=out_values;
71  if (in_values==out_values) {
72  if ( (out_buf=new T[nproc*in_n*stride+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
73  }
74 
75  // do the communication
78  int *displs=new int[nproc];
79  for (int i=0; i<nproc; i++) displs[i]=i*in_n*stride;
80  int *counts=new int[nproc];
81  for (int i=0; i<nproc; i++) counts[i]=in_n*stride;
82  MPI_CHECK_RESULT(MPI_Allgatherv, (const_cast<T*>(in_values), in_n*stride, type, out_buf, counts, displs, type, comm ));
83  delete[] displs;
84  delete[] counts;
85 
86  // deal with out_buf
87  if (in_values==out_values) {
88  memcpy(out_values,out_buf,nproc*in_n*stride*sizeof(T));
89  delete[] out_buf;
90  }
91  }
92 
94 
108  template<typename T>
109  inline void
110  all_gathervm_impl(const Communicator& comm, const T* in_values, const int in_n, const int *in_map, T* out_values, const int *out_n, const int *out_map, const int stride )
111  {
112  // get data type and number of processors
113  Datatype type = PE::get_mpi_datatype(*in_values);
114  int nproc;
115  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
116 
117  // if stride is smaller than one and unsupported functionality
118  cf3_assert( stride>0 );
119 
120  // compute displacements both on send an receive side
121  // also compute stride-multiplied send and receive counts
122  int *out_nstride=new int[nproc];
123  int *out_disp=new int[nproc];
124  out_disp[0]=0;
125  for(int i=0; i<nproc-1; i++) {
126  out_nstride[i]=stride*out_n[i];
127  out_disp[i+1]=out_disp[i]+out_nstride[i];
128  }
129  out_nstride[nproc-1]=out_n[nproc-1]*stride;
130 
131  // compute total number of send and receive items
132  const int in_sum=stride*in_n;
133  const int out_sum=out_disp[nproc-1]+stride*out_n[nproc-1];
134 
135  // set up in_buf
136  T *in_buf=(T*)in_values;
137  if (in_map!=0) {
138  if ( (in_buf=new T[in_sum+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
139  if (stride==1) { for(int i=0; i<in_sum; i++) in_buf[i]=in_values[in_map[i]]; }
140  else { for(int i=0; i<in_sum/stride; i++) memcpy(&in_buf[stride*i],&in_values[stride*in_map[i]],stride*sizeof(T)); }
141  }
142 
143  // set up out_buf
144  T *out_buf=out_values;
145  if ((out_map!=0)||(in_values==out_values)) {
146  if ( (out_buf=new T[out_sum+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
147  }
148 
149  // do the communication
150  MPI_CHECK_RESULT(MPI_Allgatherv, (in_buf, in_sum, type, out_buf, out_nstride, out_disp, type, comm));
151 
152  // re-populate out_values
153  if (out_map!=0) {
154  if (stride==1) { for(int i=0; i<out_sum; i++) out_values[out_map[i]]=out_buf[i]; }
155  else { for(int i=0; i<out_sum/stride; i++) memcpy(&out_values[stride*out_map[i]],&out_buf[stride*i],stride*sizeof(T)); }
156  delete[] out_buf;
157  } else if (in_values==out_values) {
158  memcpy(out_values,out_buf,out_sum*sizeof(T));
159  delete[] out_buf;
160  }
161 
162  // free internal memory
163  if (in_map!=0) delete[] in_buf;
164  delete[] out_disp;
165  delete[] out_nstride;
166  }
167 
169 
170 } // end namespace detail
171 
173 
183 template<typename T>
184 inline T*
185 all_gather(const Communicator& comm, const T* in_values, const int in_n, T* out_values, const int stride=1)
186 {
187  // get nproc, irank
188  int nproc;
189  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
190 
191  // allocate out_buf if incoming pointer is null
192  T* out_buf=out_values;
193  if (out_values==0) {
194  const int size=stride*nproc*in_n>1?stride*nproc*in_n:1;
195  if ( (out_buf=new T[size]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
196  }
197 
198  // call c_impl
199  detail::all_gatherc_impl(comm, in_values, in_n, out_buf, stride);
200  return out_buf;
201 }
202 
204 
212 template<typename T>
213 inline void
214 all_gather(const Communicator& comm, const std::vector<T>& in_values, std::vector<T>& out_values, const int stride=1)
215 {
216  // get nproc, irank
217  int nproc;
218  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
219 
220  // set out_values's sizes
221  cf3_assert( in_values.size() % (stride) == 0 );
222  int in_n=(int)in_values.size();
223  out_values.resize(in_n*nproc);
224  out_values.reserve(in_n*nproc);
225 
226  // call c_impl
227  detail::all_gatherc_impl(comm, (T*)(&in_values[0]), in_n/(stride), (T*)(&out_values[0]), stride);
228 }
229 
231 
239 template<typename T>
240 inline void
241 all_gather(const Communicator& comm, const T& in_value, std::vector<T>& out_values)
242 {
243  // get nproc, irank
244  int nproc;
245  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
246 
247  // set out_values's sizes
248  out_values.resize(nproc);
249  out_values.reserve(nproc);
250 
251  // call c_impl
252  detail::all_gatherc_impl(comm, (T*)(&in_value), 1, (T*)(&out_values[0]), 1);
253 }
254 
256 
257 //needs a forward
258 template<typename T>
259 inline T*
260 all_gather(const Communicator& comm, const T* in_values, const int in_n, const int *in_map, T* out_values, int *out_n, const int *out_map, const int stride=1);
261 
273 template<typename T>
274 inline T*
275 all_gather(const Communicator& comm, const T* in_values, const int in_n, T* out_values, int *out_n, const int stride=1)
276 {
277  // call mapped variable all_gather
278  return all_gather(comm,in_values,in_n,0,out_values,out_n,0,stride);
279 }
280 
282 
283 //needs a forward
284 template<typename T>
285 inline void
286 all_gather(const Communicator& comm, const std::vector<T>& in_values, const int in_n, const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n, const std::vector<int>& out_map, const int stride=1);
287 
300 template<typename T>
301 inline void
302 all_gather(const Communicator& comm, const std::vector<T>& in_values, const int in_n, std::vector<T>& out_values, std::vector<int>& out_n, const int stride=1)
303 {
304  // call mapped variable all_gather
305  std::vector<int> in_map(0);
306  std::vector<int> out_map(0);
307  if (&in_values[0]==&out_values[0])
308  {
309  std::vector<T> out_tmp(0);
310  all_gather(comm,in_values,in_n,in_map,out_tmp,out_n,out_map,stride);
311  out_values.assign(out_tmp.begin(),out_tmp.end());
312  }
313  else
314  {
315  all_gather(comm,in_values,in_n,in_map,out_values,out_n,out_map,stride);
316  }
317 }
318 
320 
335 template<typename T>
336 inline T*
337 all_gather(const Communicator& comm, const T* in_values, const int in_n, const int *in_map, T* out_values, int *out_n, const int *out_map, const int stride)
338 {
339  // get nproc, irank
340  int nproc;
341  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
342 
343  // if out_n consist of -1s then communicate for number of receives
344  int out_sum=0;
345  for (int i=0; i<nproc; i++) out_sum+=out_n[i];
346  if (out_sum==-nproc) {
347  if (out_map!=0) throw cf3::common::ParallelError(FromHere(),"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
348  detail::all_gatherc_impl(comm,&in_n,1,out_n,1);
349  out_sum=0;
350  for (int i=0; i<nproc; i++) out_sum+=out_n[i];
351  }
352 
353  // allocate out_buf if incoming pointer is null
354  T* out_buf=out_values;
355  if (out_values==0) {
356  if (out_map!=0){
357  int out_sum_tmp=0;
358  for (int i=0; i<out_sum; i++) out_sum_tmp=out_map[i]>out_sum_tmp?out_map[i]:out_sum_tmp;
359  out_sum=out_sum_tmp+1;
360  }
361  if ( (out_buf=new T[stride*out_sum]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
362  }
363 
364  // call vm_impl
365  detail::all_gathervm_impl(comm, in_values, in_n, in_map, out_buf, out_n, out_map, stride);
366  return out_buf;
367 }
368 
370 
386 template<typename T>
387 inline void
388 all_gather(const Communicator& comm, const std::vector<T>& in_values, const int in_n, const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n, const std::vector<int>& out_map, const int stride)
389 {
390  // number of processes and checking in_n and out_n (out_n deliberately throws exception because the vector can arrive from arbitrary previous usage)
391  int nproc;
392  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
393  if ((int)out_n.size()!=nproc) cf3::common::BadValue(FromHere(),"Size of vector for number of items to be received does not match to number of processes.");
394 
395  // compute number of send and receive
396  int out_sum=0;
397  boost_foreach( int i, out_n ) out_sum+=i;
398 
399  // if necessary, do communication for out_n
400  if (out_sum == -nproc){
401  if (out_map.size()!=0) throw cf3::common::ParallelError(FromHere(),"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
402  detail::all_gatherc_impl(comm,&in_n,1,&out_n[0],1);
403  out_sum=0;
404  boost_foreach( int & i, out_n ) out_sum+=i;
405  }
406 
407  // resize out_values if vector size is zero
408  if (out_values.size() == 0 ){
409  if (out_map.size()!=0) {
410  out_sum=0;
411  boost_foreach( int i, out_map ) out_sum=i>out_sum?i:out_sum;
412  if (out_sum!=0) out_sum++;
413  }
414  out_values.resize(stride*out_sum);
415  out_values.reserve(stride*out_sum);
416  }
417 
418  // call vm_impl
419  detail::all_gathervm_impl(comm, (T*)(&in_values[0]), in_n, (in_map.empty() ? nullptr : &in_map[0]), (T*)(&out_values[0]), &out_n[0], (out_map.empty() ? nullptr : &out_map[0]), stride);
420 }
421 
422 template <typename T>
423 void all_gather(const Communicator& comm, const std::vector<T>& send, std::vector<std::vector<T> >& recv)
424 {
425  std::vector<int> strides;
426  all_gather(comm,(int)send.size(),strides);
427  std::vector<int> displs(strides.size());
428  if (strides.size())
429  {
430  int sum_strides = strides[0];
431  displs[0] = 0;
432  for (Uint i=1; i<strides.size(); ++i)
433  {
434  displs[i] = displs[i-1] + strides[i-1];
435  sum_strides += strides[i];
436  }
437  std::vector<T> recv_linear(sum_strides);
438  MPI_CHECK_RESULT(MPI_Allgatherv, ((void*)&send[0], (int)send.size(), get_mpi_datatype<T>(), &recv_linear[0], &strides[0], &displs[0], get_mpi_datatype<T>(), comm));
439  recv.resize(strides.size());
440  for (Uint i=0; i<strides.size(); ++i)
441  {
442  recv[i].resize(strides[i]);
443  for (Uint j=0; j<strides[i]; ++j)
444  {
445  recv[i][j]=recv_linear[displs[i]+j];
446  }
447  }
448  }
449  else
450  {
451  recv.resize(0);
452  }
453 }
454 
456 
457 } // namespace PE
458 } // namespace common
459 } // namespace cf3
460 
462 
463 #endif // cf3_common_PE_all_gather_hpp
void all_gathervm_impl(const Communicator &comm, const T *in_values, const int in_n, const int *in_map, T *out_values, const int *out_n, const int *out_map, const int stride)
Definition: all_gather.hpp:110
#define cf3_assert(a)
Definition: Assertions.hpp:93
MPI_Datatype Datatype
datatype
Definition: types.hpp:47
Datatype get_mpi_datatype(const T &ref_of_type)
ACCESS AND REGISTRATION MECHANISM.
Definition: datatype.hpp:49
#define boost_foreach
lowercase version of BOOST_FOREACH
Definition: Foreach.hpp:16
T * all_gather(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int stride=1)
Definition: all_gather.hpp:185
void all_gatherc_impl(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int stride)
Definition: all_gather.hpp:59
Top-level namespace for coolfluid.
Definition: Action.cpp:18
MPI_Comm Communicator
communicator
Definition: types.hpp:41
unsigned int Uint
typedef for unsigned int
Definition: CF.hpp:90
#define MPI_CHECK_RESULT(MPIFunc, Args)
Macro for checking return values of any mpi calls and throws exception on error.
Definition: types.hpp:20
#define FromHere()
Send comments to:
COOLFluiD Web Admin