COOLFluiD  Release kernel
COOLFluiD is a Collaborative Simulation Environment (CSE) focused on complex MultiPhysics simulations.
gather.hpp
Go to the documentation of this file.
1 // Copyright (C) 2010-2013 von Karman Institute for Fluid Dynamics, Belgium
2 //
3 // This software is distributed under the terms of the
4 // GNU Lesser General Public License version 3 (LGPLv3).
5 // See doc/lgpl.txt and doc/gpl.txt for the license text.
6 
7 #ifndef cf3_common_PE_gather_hpp
8 #define cf3_common_PE_gather_hpp
9 
11 
12 #include "common/Foreach.hpp"
14 
15 #include "common/PE/types.hpp"
16 #include "common/PE/datatype.hpp"
17 
18 // #include "common/PE/debug.hpp" // for debugging mpi
19 
21 
35 
37 namespace cf3 {
38  namespace common {
39  namespace PE {
40 
42 
43 namespace detail {
44 
46 
57  template<typename T>
58  inline void
59  gatherc_impl(const Communicator& comm, const T* in_values, const int in_n, T* out_values, const int root, const int stride )
60  {
61  // get data type and number of processors
62  Datatype type = PE::get_mpi_datatype(*in_values);
63  int nproc,irank;
64  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
65  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
66 
67  // if stride is greater than one
68  cf3_assert( stride>0 );
69 
70  // set up out_buf
71  T* out_buf=out_values;
72  if (irank==root) if (in_values==out_values) {
73  if ( (out_buf=new T[nproc*in_n*stride+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
74  }
75 
76  // do the communication
77  MPI_CHECK_RESULT(MPI_Gather, (const_cast<T*>(in_values), in_n*stride, type, out_buf, in_n*stride, type, root, comm));
78 
79  // deal with out_buf
80  if (irank==root) if (in_values==out_values) {
81  memcpy(out_values,out_buf,nproc*in_n*stride*sizeof(T));
82  delete[] out_buf;
83  }
84  }
85 
87 
101  template<typename T>
102  inline void
103  gathervm_impl(const Communicator& comm, const T* in_values, const int in_n, const int *in_map, T* out_values, const int *out_n, const int *out_map, const int root, const int stride )
104  {
105  // get data type and number of processors
106  Datatype type = PE::get_mpi_datatype(*in_values);
107  int nproc,irank;
108  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
109  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
110 
111  // if stride is smaller than one and unsupported functionality
112  cf3_assert( stride>0 );
113 
114  // compute displacements both on send an receive side
115  // also compute stride-multiplied send and receive counts
116  int *out_nstride=0;
117  int *out_disp=0;
118  int out_sum=0;
119  if (irank==root) {
120  out_nstride=new int[nproc];
121  out_disp=new int[nproc];
122  out_disp[0]=0;
123  for(int i=0; i<nproc-1; i++) {
124  out_nstride[i]=stride*out_n[i];
125  out_disp[i+1]=out_disp[i]+out_nstride[i];
126  }
127  out_nstride[nproc-1]=out_n[nproc-1]*stride;
128  out_sum=out_disp[nproc-1]+stride*out_n[nproc-1];
129  }
130 
131  // compute total number of send and receive items
132  const int in_sum=stride*in_n;
133 
134  // set up in_buf
135  T *in_buf=(T*)in_values;
136  if (in_map!=0) {
137  if ( (in_buf=new T[in_sum+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
138  if (stride==1) { for(int i=0; i<in_sum; i++) in_buf[i]=in_values[in_map[i]]; }
139  else { for(int i=0; i<in_sum/stride; i++) memcpy(&in_buf[stride*i],&in_values[stride*in_map[i]],stride*sizeof(T)); }
140  }
141 
142  // set up out_buf
143  T *out_buf=out_values;
144  if (irank==root) if ((out_map!=0)||(in_values==out_values)) {
145  if ( (out_buf=new T[out_sum+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
146  }
147 
148  // do the communication
149  MPI_CHECK_RESULT(MPI_Gatherv, (in_buf, in_sum, type, out_buf, out_nstride, out_disp, type, root, comm));
150 
151  // re-populate out_values
152  if (irank==root) {
153  if (out_map!=0) {
154  if (stride==1) { for(int i=0; i<out_sum; i++) out_values[out_map[i]]=out_buf[i]; }
155  else { for(int i=0; i<out_sum/stride; i++) memcpy(&out_values[stride*out_map[i]],&out_buf[stride*i],stride*sizeof(T)); }
156  delete[] out_buf;
157  } else if (in_values==out_values) {
158  memcpy(out_values,out_buf,out_sum*sizeof(T));
159  delete[] out_buf;
160  }
161  }
162 
163  // free internal memory
164  if (in_map!=0) delete[] in_buf;
165  delete[] out_disp;
166  delete[] out_nstride;
167  }
168 
170 
171 } // end namespace detail
172 
174 
184 template<typename T>
185 inline T*
186 gather(const Communicator& comm, const T* in_values, const int in_n, T* out_values, const int root, const int stride=1)
187 {
188  // get nproc, irank
189  int nproc,irank;
190  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
191  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
192 
193  // allocate out_buf if incoming pointer is null
194  T* out_buf=out_values;
195  if (irank==root) if (out_values==0) {
196  const int size=stride*nproc*in_n>1?stride*nproc*in_n:1;
197  if ( (out_buf=new T[size]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
198  }
199 
200  // call c_impl
201  if (irank==root) {
202  detail::gatherc_impl(comm, in_values, in_n, out_buf, root, stride);
203  } else {
204  detail::gatherc_impl(comm, in_values, in_n, (T*)0, root, stride);
205  }
206  return out_buf;
207 }
208 
210 
218 template<typename T>
219 inline void
220 gather(const Communicator& comm, const std::vector<T>& in_values, std::vector<T>& out_values, const int root, const int stride=1)
221 {
222  // get nproc, irank
223  int nproc,irank;
224  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
225  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
226 
227  // set out_values's sizes
228  cf3_assert( in_values.size() % (stride) == 0 );
229  int in_n=(int)in_values.size();
230  if (irank==root) {
231  out_values.resize(in_n*nproc);
232  out_values.reserve(in_n*nproc);
233  }
234 
235  // call c_impl
236  if (irank==root) {
237  detail::gatherc_impl(comm, (T*)(&in_values[0]), in_n/(stride), (T*)(&out_values[0]), root, stride);
238  } else {
239  detail::gatherc_impl(comm, (T*)(&in_values[0]), in_n/(stride), (T*)0, root, stride);
240  }
241 }
242 
244 
245 //needs a forward
246 template<typename T>
247 inline T*
248 gather(const Communicator& comm, const T* in_values, const int in_n, const int *in_map, T* out_values, int *out_n, const int *out_map, const int root, const int stride=1);
249 
261 template<typename T>
262 inline T*
263 gather(const Communicator& comm, const T* in_values, const int in_n, T* out_values, int *out_n, const int root, const int stride=1)
264 {
265  // call mapped variable gather
266  return gather(comm,in_values,in_n,0,out_values,out_n,0,root,stride);
267 }
268 
270 
271 //needs a forward
272 template<typename T>
273 inline void
274 gather(const Communicator& comm, const std::vector<T>& in_values, const int in_n, const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n, const std::vector<int>& out_map, const int root, const int stride=1);
275 
288 template<typename T>
289 inline void
290 gather(const Communicator& comm, const std::vector<T>& in_values, const int in_n, std::vector<T>& out_values, std::vector<int>& out_n, const int root, const int stride=1)
291 {
292  // call mapped variable gather
293  std::vector<int> in_map(0);
294  std::vector<int> out_map(0);
295 
296  int irank;
297  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
298 
299  if ((irank==root)&&(&in_values[0]==&out_values[0]))
300  {
301  std::vector<T> out_tmp(0);
302  gather(comm,in_values,in_n,in_map,out_tmp,out_n,out_map,root,stride);
303  out_values.assign(out_tmp.begin(),out_tmp.end());
304  }
305  else
306  {
307  gather(comm,in_values,in_n,in_map,out_values,out_n,out_map,root,stride);
308  }
309 
310 }
311 
313 
328 template<typename T>
329 inline T*
330 gather(const Communicator& comm, const T* in_values, const int in_n, const int *in_map, T* out_values, int *out_n, const int *out_map, const int root, const int stride)
331 {
332  // get nproc, irank
333  int nproc,irank;
334  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
335  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
336 
337  // if out_n consist of -1s then communicate for number of receives
338  int out_sum=0;
339  for (int i=0; i<nproc; i++) out_sum+=out_n[i];
340  if (out_sum==-nproc) {
341  if (out_map!=0) throw cf3::common::ParallelError(FromHere(),"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
342  detail::gatherc_impl(comm,&in_n,1,out_n,root,1);
343  out_sum=0;
344  if (irank==root) for (int i=0; i<nproc; i++) out_sum+=out_n[i];
345  }
346 
347  // allocate out_buf if incoming pointer is null
348  T* out_buf=out_values;
349  if (irank==root) {
350  if (out_values==0) {
351  if (out_map!=0){
352  int out_sum_tmp=0;
353  for (int i=0; i<out_sum; i++) out_sum_tmp=out_map[i]>out_sum_tmp?out_map[i]:out_sum_tmp;
354  out_sum=out_sum_tmp+1;
355  }
356  if ( (out_buf=new T[stride*out_sum]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
357  }
358  }
359 
360  // call vm_impl
361  if (irank==root) {
362  detail::gathervm_impl(comm, in_values, in_n, in_map, out_buf, out_n, out_map, root, stride);
363  } else {
364  detail::gathervm_impl(comm, in_values, in_n, in_map, (T*)0, (int*)0, (int*)0, root, stride);
365  }
366  return out_buf;
367 }
368 
370 
386 template<typename T>
387 inline void
388 gather(const Communicator& comm, const std::vector<T>& in_values, const int in_n, const std::vector<int>& in_map, std::vector<T>& out_values, std::vector<int>& out_n, const std::vector<int>& out_map, const int root, const int stride)
389 {
390  // number of processes and checking in_n and out_n (out_n deliberately throws exception because the vector can arrive from arbitrary previous usage)
391  int nproc,irank;
392  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
393  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
394  if ((int)out_n.size()!=nproc) cf3::common::BadValue(FromHere(),"Size of vector for number of items to be received does not match to number of processes.");
395 
396  // compute number of send and receive
397  int out_sum=0;
398  boost_foreach( int i, out_n ) out_sum+=i;
399 
400  // if necessary, do communication for out_n
401  if (out_sum == -nproc){
402  if (out_map.size()!=0) throw cf3::common::ParallelError(FromHere(),"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
403  detail::gatherc_impl(comm,&in_n,1,&out_n[0],root,1);
404  out_sum=0;
405  if (irank==root) boost_foreach( int & i, out_n ) out_sum+=i;
406  }
407 
408  // resize out_values if vector size is zero
409  if (irank==root) {
410  if (out_values.size() == 0 ){
411  if (out_map.size()!=0) {
412  out_sum=0;
413  boost_foreach( int i, out_map ) out_sum=i>out_sum?i:out_sum;
414  if (out_sum!=0) out_sum++;
415  }
416  out_values.resize(stride*out_sum);
417  out_values.reserve(stride*out_sum);
418  }
419  }
420 
421  // call vm_impl
422  if (irank==root) {
423  detail::gathervm_impl(comm, (T*)(&in_values[0]), in_n, (in_map.empty() ? nullptr : &in_map[0]), (T*)(&out_values[0]), &out_n[0], (out_map.empty() ? nullptr : &out_map[0]), root, stride);
424  } else {
425  detail::gathervm_impl(comm, (T*)(&in_values[0]), in_n, (in_map.empty() ? nullptr : &in_map[0]), (T*)0, (int*)0, (int*)0, root, stride);
426  }
427 }
428 
430 
431 } // namespace PE
432 } // namespace common
433 } // namespace cf3
434 
436 
437 #endif // cf3_common_PE_gather_hpp
T * gather(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int root, const int stride=1)
Definition: gather.hpp:186
void gatherc_impl(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int root, const int stride)
Definition: gather.hpp:59
#define cf3_assert(a)
Definition: Assertions.hpp:93
MPI_Datatype Datatype
datatype
Definition: types.hpp:47
Datatype get_mpi_datatype(const T &ref_of_type)
ACCESS AND REGISTRATION MECHANISM.
Definition: datatype.hpp:49
#define boost_foreach
lowercase version of BOOST_FOREACH
Definition: Foreach.hpp:16
tuple root
Definition: coolfluid.py:24
void gathervm_impl(const Communicator &comm, const T *in_values, const int in_n, const int *in_map, T *out_values, const int *out_n, const int *out_map, const int root, const int stride)
Definition: gather.hpp:103
Top-level namespace for coolfluid.
Definition: Action.cpp:18
MPI_Comm Communicator
communicator
Definition: types.hpp:41
#define MPI_CHECK_RESULT(MPIFunc, Args)
Macro for checking return values of any mpi calls and throws exception on error.
Definition: types.hpp:20
#define FromHere()
Send comments to:
COOLFluiD Web Admin