COOLFluiD  Release kernel
COOLFluiD is a Collaborative Simulation Environment (CSE) focused on complex MultiPhysics simulations.
reduce.hpp
Go to the documentation of this file.
1 // Copyright (C) 2010-2013 von Karman Institute for Fluid Dynamics, Belgium
2 //
3 // This software is distributed under the terms of the
4 // GNU Lesser General Public License version 3 (LGPLv3).
5 // See doc/lgpl.txt and doc/gpl.txt for the license text.
6 
7 #ifndef cf3_common_PE_reduce_hpp
8 #define cf3_common_PE_reduce_hpp
9 
11 
13 #include "common/Foreach.hpp"
15 
16 #include "common/PE/types.hpp"
17 #include "common/PE/operations.hpp"
18 #include "common/PE/datatype.hpp"
19 
20 // #include "common/PE/debug.hpp" // for debugging mpi
21 
23 
34 
36 namespace cf3 {
37  namespace common {
38  namespace PE {
39 
41 
42 namespace detail {
43 
45 
59  template<typename T, typename Op>
60  inline void
61  reduce_impl(const Communicator& comm, Op, const T* in_values, const int in_n, const int *in_map, T* out_values, const int *out_map, const int root, const int stride)
62  {
63  // get rank
64  int irank;
65  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
66 
67  // get data type, op and some checkings
68  Datatype type = get_mpi_datatype(*in_values);
70  cf3_assert( stride>0 );
71 
72  // there is in_map
73  T *in_buf=(T*)in_values;
74  if (in_map!=0){
75  if ( (in_buf=new T[stride*in_n+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
76  if (stride==1) { for(int i=0; i<in_n; i++) in_buf[i]=in_values[in_map[i]]; }
77  else { for(int i=0; i<in_n; i++) memcpy(&in_buf[stride*i],&in_values[stride*in_map[i]],stride*sizeof(T)); }
78  }
79 
80  // set up out_buf
81  T *out_buf=out_values;
82  if (irank==root) {
83  if ((out_map!=0)||(in_values==out_values)) {
84  if ( (out_buf=new T[in_n*stride+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
85  }
86  }
87 
88  // do the communication
89  MPI_CHECK_RESULT(MPI_Reduce, ( in_buf, out_buf, in_n*stride, type, op_, root, comm ));
90 
91  // re-populate out_values
92  if (irank==root) {
93  if (out_map!=0) {
94  if (stride==1) { for(int i=0; i<in_n; i++) out_values[out_map[i]]=out_buf[i]; }
95  else { for(int i=0; i<in_n; i++) memcpy(&out_values[stride*out_map[i]],&out_buf[stride*i],stride*sizeof(T)); }
96  delete[] out_buf;
97  } else if (in_values==out_values) {
98  memcpy(out_values,out_buf,in_n*stride*sizeof(T));
99  delete[] out_buf;
100  }
101  }
102 
103  // free internal memory
104  if (in_map!=0) delete[] in_buf;
105  }
106 
108 
109 } // end namespace detail
110 
112 
122 template<typename T, typename Op>
123 inline T*
124 reduce(const Communicator& comm, const Op& op, const T* in_values, const int in_n, T* out_values, const int root, const int stride=1)
125 {
126  // get rank
127  int irank;
128  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
129 
130  // allocate out_buf if incoming pointer is null
131  T* out_buf=out_values;
132  if (irank==root) {
133  if (out_values==0) {
134  const int size=stride*in_n>1?stride*in_n:1;
135  if ( (out_buf=new T[size]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
136  }
137  }
138 
139  // call impl
140  if (irank==root) {
141  detail::reduce_impl(comm,op,in_values,in_n,(int*)0,out_buf,(int*)0,root,stride);
142  } else {
143  detail::reduce_impl(comm,op,in_values,in_n,(int*)0,(T*)0,(int*)0,root,stride);
144  }
145  return out_buf;
146 }
147 
149 
157 template<typename T, typename Op>
158 inline void
159 reduce(const Communicator& comm, const Op& op, const std::vector<T>& in_values, std::vector<T>& out_values, const int root, const int stride=1)
160 {
161  // get rank
162  int irank;
163  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
164 
165  // set out_values's sizes
166  cf3_assert( in_values.size() % stride == 0 );
167  if (irank==root) {
168  out_values.resize(in_values.size());
169  out_values.reserve(in_values.size());
170  }
171 
172  // call impl
173  if (irank==root) {
174  detail::reduce_impl(comm, op, (T*)(&in_values[0]), in_values.size()/stride, (int*)0, (T*)(&out_values[0]), (int*)0, root, stride);
175  } else {
176  detail::reduce_impl(comm, op, (T*)(&in_values[0]), in_values.size()/stride, (int*)0, (T*)0, (int*)0, root, stride);
177  }
178 }
179 
181 
196 template<typename T, typename Op>
197 inline T*
198 reduce(const Communicator& comm, const Op& op, const T* in_values, const int in_n, const int *in_map, T* out_values, const int *out_map, const int root, const int stride=1)
199 {
200  // get rank
201  int irank;
202  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
203 
204  // allocate out_buf if incoming pointer is null
205  T* out_buf=out_values;
206  if (irank==root) {
207  if (out_values==0) {
208  int out_sum=in_n;
209  if (out_map!=0){
210  int out_sum_tmp=0;
211  for (int i=0; i<out_sum; i++) out_sum_tmp=out_map[i]>out_sum_tmp?out_map[i]:out_sum_tmp;
212  out_sum=out_sum_tmp+1;
213  }
214  if ( (out_buf=new T[stride*out_sum]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
215  }
216  }
217 
218  // call impl
219  if (irank==root){
220  detail::reduce_impl(comm,op,in_values,in_n,in_map,out_buf,out_map,root,stride);
221  } else {
222  detail::reduce_impl(comm,op,in_values,in_n,in_map,(T*)0,(int*)0,root,stride);
223  }
224  return out_buf;
225 }
226 
228 
244 template<typename T, typename Op>
245 inline void
246 reduce(const Communicator& comm, const Op& op, const std::vector<T>& in_values, const std::vector<int>& in_map, std::vector<T>& out_values, const std::vector<int>& out_map, const int root, const int stride=1)
247 {
248  // get rank
249  int irank;
250  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
251 
252  // set out_values's sizes
253  cf3_assert( in_values.size() % stride == 0 );
254 
255  // resize out_values if vector size is zero
256  if (irank==root){
257  if (out_values.size() == 0 ){
258  int out_sum=in_map.size();
259  if (out_map.size()!=0) {
260  boost_foreach( int i, out_map ) out_sum=i>out_sum?i:out_sum;
261  }
262  out_values.resize(stride*out_sum);
263  out_values.reserve(stride*out_sum);
264  }
265  }
266 
267  // call impl
268  if (irank==root){
269  detail::reduce_impl(comm, op, (T*)(&in_values[0]), in_map.size(), (in_map.empty() ? nullptr : &in_map[0]), (T*)(&out_values[0]), (out_map.empty() ? nullptr : &out_map[0]), root, stride);
270  } else {
271  detail::reduce_impl(comm, op, (T*)(&in_values[0]), in_map.size(), (in_map.empty() ? nullptr : &in_map[0]), (T*)0, (int*)0, root, stride);
272  }
273 }
274 
276 
277  } // end namespace PE
278  } // end namespace common
279 } // end namespace cf3
280 
282 
283 #endif // cf3_common_PE_reduce_hpp
static Operation op()
Definition: operations.hpp:65
#define cf3_assert(a)
Definition: Assertions.hpp:93
MPI_Datatype Datatype
datatype
Definition: types.hpp:47
Datatype get_mpi_datatype(const T &ref_of_type)
ACCESS AND REGISTRATION MECHANISM.
Definition: datatype.hpp:49
#define boost_foreach
lowercase version of BOOST_FOREACH
Definition: Foreach.hpp:16
tuple root
Definition: coolfluid.py:24
T * reduce(const Communicator &comm, const Op &op, const T *in_values, const int in_n, T *out_values, const int root, const int stride=1)
Definition: reduce.hpp:124
Top-level namespace for coolfluid.
Definition: Action.cpp:18
void reduce_impl(const Communicator &comm, Op, const T *in_values, const int in_n, const int *in_map, T *out_values, const int *out_map, const int root, const int stride)
Definition: reduce.hpp:61
MPI_Comm Communicator
communicator
Definition: types.hpp:41
#define MPI_CHECK_RESULT(MPIFunc, Args)
Macro for checking return values of any mpi calls and throws exception on error.
Definition: types.hpp:20
MPI_Op Operation
operation (mostly for reduce and all_reduce)
Definition: types.hpp:44
#define FromHere()
Send comments to:
COOLFluiD Web Admin