COOLFluiD  Release kernel
COOLFluiD is a Collaborative Simulation Environment (CSE) focused on complex MultiPhysics simulations.
scatter.hpp
Go to the documentation of this file.
1 // Copyright (C) 2010-2013 von Karman Institute for Fluid Dynamics, Belgium
2 //
3 // This software is distributed under the terms of the
4 // GNU Lesser General Public License version 3 (LGPLv3).
5 // See doc/lgpl.txt and doc/gpl.txt for the license text.
6 
7 #ifndef cf3_common_PE_scatter_hpp
8 #define cf3_common_PE_scatter_hpp
9 
11 
12 #include "common/Foreach.hpp"
14 
15 #include "common/PE/types.hpp"
16 #include "common/PE/datatype.hpp"
17 
18 // #include "common/PE/debug.hpp" // for debugging mpi
19 
21 
35 
37 namespace cf3 {
38  namespace common {
39  namespace PE {
40 
42 
43 namespace detail {
44 
46 
57  template<typename T>
58  inline void
59  scatterc_impl(const Communicator& comm, const T* in_values, const int in_n, T* out_values, const int root, const int stride )
60  {
61  // get data type and number of processors
62  Datatype type = PE::get_mpi_datatype(*in_values);
63  int nproc,irank;
64  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
65  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
66 
67  // if stride is greater than one
68  cf3_assert( stride>0 );
69 
70  // set up out_buf
71  T* out_buf=out_values;
72  if ((irank==root)&&(in_values==out_values)) {
73  if ( (out_buf=new T[nproc*in_n*stride+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
74  }
75 
76  // do the communication
77  MPI_CHECK_RESULT(MPI_Scatter, (const_cast<T*>(in_values), in_n*stride, type, out_buf, in_n*stride, type, root, comm));
78 
79  // deal with out_buf
80  if ((irank==root)&&(in_values==out_values)) {
81  memcpy(out_values,out_buf,nproc*in_n*stride*sizeof(T));
82  delete[] out_buf;
83  }
84  }
85 
87 
101  template<typename T>
102  inline void
103  scattervm_impl(const Communicator& comm, const T* in_values, const int* in_n, const int* in_map, T* out_values, int& out_n, const int *out_map, const int root, const int stride )
104  {
105  // get data type and number of processors
106  Datatype type = PE::get_mpi_datatype(*in_values);
107  int nproc,irank;
108  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
109  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
110 
111  // if stride is smaller than one and unsupported functionality
112  cf3_assert( stride>0 );
113 
114  int *in_nstride=(int*)0;
115  int *in_disp=(int*)0;
116  T *in_buf=0;
117  int in_sum;
118  if (irank==root){
119  // compute displacements on send side
120  in_nstride=new int[nproc];
121  in_disp=new int[nproc];
122  in_disp[0]=0;
123  for(int i=0; i<nproc-1; i++) {
124  in_nstride[i]=stride*in_n[i];
125  in_disp[i+1]=in_disp[i]+in_nstride[i];
126  }
127  in_nstride[nproc-1]=in_n[nproc-1]*stride;
128  in_sum=in_disp[nproc-1]+stride*in_n[nproc-1];
129  // set up in_buf
130  if (in_map!=0) {
131  if ( (in_buf=new T[in_sum+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
132  if (stride==1) { for(int i=0; i<in_sum; i++) in_buf[i]=in_values[in_map[i]]; }
133  else { for(int i=0; i<(const int)(in_sum/stride); i++) memcpy(&in_buf[stride*i],&in_values[stride*in_map[i]],stride*sizeof(T)); }
134  } else {
135  in_buf=(T*)in_values;
136  }
137  }
138 
139  // compute total number of send and receive items
140  const int out_sum=stride*out_n;
141  const int out_nstride=stride*out_n;
142 
143  // set up out_buf
144  T *out_buf=0;
145  if ((out_map!=0)||(in_values==out_values)) {
146  if ( (out_buf=new T[out_sum+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
147  } else {
148  out_buf=out_values;
149  }
150 
151  // do the communication
152  MPI_CHECK_RESULT(MPI_Scatterv, (in_buf, in_nstride, in_disp, type, out_buf, out_nstride, type, root, comm));
153 
154  // re-populate out_values
155  if (out_map!=0) {
156  if (stride==1) { for(int i=0; i<out_sum; i++) out_values[out_map[i]]=out_buf[i]; }
157  else { for(int i=0; i<(const int)(out_sum/stride); i++) memcpy(&out_values[stride*out_map[i]],&out_buf[stride*i],stride*sizeof(T)); }
158  delete[] out_buf;
159  } else if (in_values==out_values) {
160  memcpy(out_values,out_buf,out_sum*sizeof(T));
161  delete[] out_buf;
162  }
163 
164  // free internal memory
165  if (irank==root) if (in_map!=0) delete[] in_buf;
166  delete[] in_disp;
167  delete[] in_nstride;
168  }
169 
171 
172 } // end namespace detail
173 
175 
185 template<typename T>
186 inline T*
187 scatter(const Communicator& comm, const T* in_values, const int in_n, T* out_values, const int root, const int stride=1)
188 {
189  // get nproc
190  int nproc,irank;
191  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
192  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
193 
194  // allocate out_buf if incoming pointer is null
195  T* out_buf=out_values;
196  if (out_values==0) {
197  const int size=stride*nproc*in_n>1?stride*nproc*in_n:1;
198  if ( (out_buf=new T[size]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
199  }
200 
201  // call c_impl
202  if (irank==root){
203  detail::scatterc_impl(comm, in_values, in_n, out_buf, root, stride);
204  } else {
205  detail::scatterc_impl(comm, (T*)0, in_n, out_buf, root, stride);
206  }
207  return out_buf;
208 }
209 
211 
219 template<typename T>
220 inline void
221 scatter(const Communicator& comm, const std::vector<T>& in_values, std::vector<T>& out_values, const int root, const int stride=1)
222 {
223  // get number of processors
224  int nproc,irank;
225  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
226  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
227 
228  // set out_values's sizes
229  cf3_assert( in_values.size() % (nproc*stride) == 0 );
230  if (((in_values!=out_values)&&(irank!=root))||(out_values.size()==0)) {
231  out_values.resize(in_values.size()/nproc);
232  out_values.reserve(in_values.size()/nproc);
233  }
234 
235  // call c_impl
236  if (irank==root){
237  detail::scatterc_impl(comm, (T*)(&in_values[0]), in_values.size()/(nproc*stride), (T*)(&out_values[0]), root, stride);
238  } else {
239  detail::scatterc_impl(comm, (T*)0, in_values.size()/(nproc*stride), (T*)(&out_values[0]), root, stride);
240  }
241 
242  // reduce size if in_values=out_values
243  if (irank==root) {
244  out_values.resize(in_values.size()/nproc);
245  out_values.reserve(in_values.size()/nproc);
246  }
247 }
248 
250 
251 //needs a forward
252 template<typename T>
253 inline T*
254 scatter(const Communicator& comm, const T* in_values, const int *in_n, const int *in_map, T* out_values, int& out_n, const int *out_map, const int root, const int stride=1);
255 
267 template<typename T>
268 inline T*
269 scatter(const Communicator& comm, const T* in_values, const int* in_n, T* out_values, int& out_n, const int root, const int stride=1)
270 {
271  // call mapped variable scatter
272  return scatter(comm,in_values,in_n,0,out_values,out_n,0,root,stride);
273 }
274 
276 
277 //needs a forward
278 template<typename T>
279 inline void
280 scatter(const Communicator& comm, const std::vector<T>& in_values, const std::vector<int>& in_n, const std::vector<int>& in_map, std::vector<T>& out_values, int& out_n, const std::vector<int>& out_map, const int root, const int stride=1);
281 
294 template<typename T>
295 inline void
296 scatter(const Communicator& comm, const std::vector<T>& in_values, const std::vector<int>& in_n, std::vector<T>& out_values, int& out_n, const int root, const int stride=1)
297 {
298  // call mapped variable scatter
299  std::vector<int> in_map(0);
300  std::vector<int> out_map(0);
301 
302  int irank;
303  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
304 
305  if ((irank==root)&&(&in_values[0]==&out_values[0]))
306  {
307  std::vector<T> out_tmp(0);
308  scatter(comm,in_values,in_n,in_map,out_tmp,out_n,out_map,root,stride);
309  out_values.assign(out_tmp.begin(),out_tmp.end());
310  }
311  else
312  {
313  scatter(comm,in_values,in_n,in_map,out_values,out_n,out_map,root,stride);
314  }
315 
316 }
317 
319 
334 template<typename T>
335 inline T*
336 scatter(const Communicator& comm, const T* in_values, const int *in_n, const int *in_map, T* out_values, int& out_n, const int *out_map, const int root, const int stride)
337 {
338  // number of processes
339  int nproc,irank;
340  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
341  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
342 
343  // if out_n consist of -1s then communicate for number of receives
344  int out_sum=out_n;
345  if (out_sum==-1) {
346  if (out_map!=0) throw cf3::common::ParallelError(FromHere(),"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
347  detail::scatterc_impl(comm,in_n,1,&out_n,root,1);
348  out_sum=out_n;
349  }
350 
351  // allocate out_buf if incoming pointer is null
352  T* out_buf=out_values;
353  if (out_values==0) {
354  if (out_map!=0){
355  int out_sum_tmp=0;
356  for (int i=0; i<out_sum; i++) out_sum_tmp=out_map[i]>out_sum_tmp?out_map[i]:out_sum_tmp;
357  out_sum=out_sum_tmp+1;
358  }
359  if ( (out_buf=new T[stride*out_sum]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
360  }
361 
362  // call vm_impl
363  if (irank==root) {
364  detail::scattervm_impl(comm, in_values, in_n, in_map, out_buf, out_n, out_map, root, stride);
365  } else {
366  detail::scattervm_impl(comm, (T*)0, in_n, in_map, out_buf, out_n, out_map, root, stride);
367  }
368  return out_buf;
369 }
370 
372 
388 template<typename T>
389 inline void
390 scatter(const Communicator& comm, const std::vector<T>& in_values, const std::vector<int>& in_n, const std::vector<int>& in_map, std::vector<T>& out_values, int& out_n, const std::vector<int>& out_map, const int root, const int stride)
391 {
392  // number of processes and checking in_n and out_n (out_n deliberately throws exception because the vector can arrive from arbitrary previous usage)
393  int nproc,irank;
394  MPI_CHECK_RESULT(MPI_Comm_size,(comm,&nproc));
395  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
396  cf3_assert( (int)in_n.size() == nproc );
397 
398  // compute number of send and receive
399  int in_sum=0;
400  int out_sum=out_n;
401  boost_foreach( int i, in_n ) in_sum+=i;
402 
403  // if necessary, do communication for out_n
404  if (out_sum == -1){
405  if (out_map.size()!=0) throw cf3::common::ParallelError(FromHere(),"Trying to perform communication with receive map while receive counts are unknown, this is bad usage of parallel environment.");
406  detail::scatterc_impl(comm,&in_n[0],1,&out_n,root,1);
407  out_sum=out_n;
408  }
409 
410  // resize out_values if vector size is zero
411  if (out_values.size() == 0 ){
412  if (out_map.size()!=0) {
413  out_sum=0;
414  boost_foreach( int i, out_map ) out_sum=i>out_sum?i:out_sum;
415  if (out_sum!=0) out_sum++;
416  }
417  out_values.resize(stride*out_sum);
418  out_values.reserve(stride*out_sum);
419  }
420 
421  // call vm_impl
422  if (irank==root) {
423  detail::scattervm_impl(comm, (T*)(&in_values[0]), &in_n[0], (in_map.empty() ? nullptr : &in_map[0]), (T*)(&out_values[0]), out_n, (out_map.empty() ? nullptr : &out_map[0]), root, stride);
424  } else {
425  detail::scattervm_impl(comm, (T*)0, &in_n[0], (in_map.empty() ? nullptr : &in_map[0]), (T*)(&out_values[0]), out_n, (out_map.empty() ? nullptr : &out_map[0]), root, stride);
426  }
427 }
428 
430 
431 } // namespace PE
432 } // namespace common
433 } // namespace cf3
434 
436 
437 #endif // cf3_common_PE_scatter_hpp
#define cf3_assert(a)
Definition: Assertions.hpp:93
MPI_Datatype Datatype
datatype
Definition: types.hpp:47
Datatype get_mpi_datatype(const T &ref_of_type)
ACCESS AND REGISTRATION MECHANISM.
Definition: datatype.hpp:49
#define boost_foreach
lowercase version of BOOST_FOREACH
Definition: Foreach.hpp:16
tuple root
Definition: coolfluid.py:24
void scattervm_impl(const Communicator &comm, const T *in_values, const int *in_n, const int *in_map, T *out_values, int &out_n, const int *out_map, const int root, const int stride)
Definition: scatter.hpp:103
T * scatter(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int root, const int stride=1)
Definition: scatter.hpp:187
Top-level namespace for coolfluid.
Definition: Action.cpp:18
void scatterc_impl(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int root, const int stride)
Definition: scatter.hpp:59
MPI_Comm Communicator
communicator
Definition: types.hpp:41
#define MPI_CHECK_RESULT(MPIFunc, Args)
Macro for checking return values of any mpi calls and throws exception on error.
Definition: types.hpp:20
#define FromHere()
Send comments to:
COOLFluiD Web Admin