COOLFluiD  Release kernel
COOLFluiD is a Collaborative Simulation Environment (CSE) focused on complex MultiPhysics simulations.
all_reduce.hpp
Go to the documentation of this file.
1 // Copyright (C) 2010-2013 von Karman Institute for Fluid Dynamics, Belgium
2 //
3 // This software is distributed under the terms of the
4 // GNU Lesser General Public License version 3 (LGPLv3).
5 // See doc/lgpl.txt and doc/gpl.txt for the license text.
6 
7 #ifndef cf3_common_PE_all_reduce_hpp
8 #define cf3_common_PE_all_reduce_hpp
9 
11 
12 #include "common/Foreach.hpp"
13 #include "common/Assertions.hpp"
15 
16 #include "common/PE/types.hpp"
17 #include "common/PE/operations.hpp"
18 #include "common/PE/datatype.hpp"
19 
20 // #include "common/PE/debug.hpp" // for debugging mpi
21 
23 
34 
36 namespace cf3 {
37  namespace common {
38  namespace PE {
39 
41 
42 namespace detail {
43 
45 
60  template<typename T, typename Op>
61  inline void
62  all_reduce_impl(const Communicator& comm, Op, const T* in_values, const int in_n, const int *in_map, T* out_values, const int *out_map, const int stride)
63  {
64  // get data type, op and some checkings
65  Datatype type = get_mpi_datatype(*in_values);
67  cf3_assert( stride>0 );
68 
69  // there is in_map
70  T *in_buf=(T*)in_values;
71  if (in_map!=0){
72  if ( (in_buf=new T[stride*in_n+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
73  if (stride==1) { for(int i=0; i<in_n; i++) in_buf[i]=in_values[in_map[i]]; }
74  else { for(int i=0; i<in_n; i++) memcpy(&in_buf[stride*i],&in_values[stride*in_map[i]],stride*sizeof(T)); }
75  }
76 
77  // set up out_buf
78  T *out_buf=out_values;
79  if ((out_map!=0)||(in_values==out_values)) {
80  if ( (out_buf=new T[in_n*stride+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
81  }
82 
83  // do the communication
84  MPI_CHECK_RESULT(MPI_Allreduce, ( in_buf, out_buf, in_n*stride, type, op_, comm ));
85 
86  // re-populate out_values
87  if (out_map!=0) {
88  if (stride==1) { for(int i=0; i<in_n; i++) out_values[out_map[i]]=out_buf[i]; }
89  else { for(int i=0; i<in_n; i++) memcpy(&out_values[stride*out_map[i]],&out_buf[stride*i],stride*sizeof(T)); }
90  delete[] out_buf;
91  } else if (in_values==out_values) {
92  memcpy(out_values,out_buf,in_n*stride*sizeof(T));
93  delete[] out_buf;
94  }
95 
96  // free internal memory
97  if (in_map!=0) delete[] in_buf;
98  }
99 
101 
102 } // end namespace detail
103 
105 
115 template<typename T, typename Op>
116 inline T*
117 all_reduce(const Communicator& comm, const Op& op, const T* in_values, const int in_n, T* out_values, const int stride=1)
118 {
119  // allocate out_buf if incoming pointer is null
120  T* out_buf=out_values;
121  if (out_values==0) {
122  const int size=stride*in_n>1?stride*in_n:1;
123  if ( (out_buf=new T[size]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
124  }
125 
126  // call impl
127  detail::all_reduce_impl(comm,op,in_values,in_n,0,out_buf,0,stride);
128  return out_buf;
129 }
130 
132 
140 template<typename T, typename Op>
141 inline void
142 all_reduce(const Communicator& comm, const Op& op, const std::vector<T>& in_values, std::vector<T>& out_values, const int stride=1)
143 {
144  // set out_values's sizes
145  cf3_assert( in_values.size() % stride == 0 );
146  out_values.resize(in_values.size());
147  out_values.reserve(in_values.size());
148 
149  // call impl
150  detail::all_reduce_impl(comm, op, (T*)(&in_values[0]), in_values.size()/stride, 0, (T*)(&out_values[0]), 0, stride);
151 }
152 
154 
169 template<typename T, typename Op>
170 inline T*
171 all_reduce(const Communicator& comm, const Op& op, const T* in_values, const int in_n, const int *in_map, T* out_values, const int *out_map, const int stride=1)
172 {
173  // allocate out_buf if incoming pointer is null
174  T* out_buf=out_values;
175  if (out_values==0) {
176  int out_sum=in_n;
177  if (out_map!=0){
178  int out_sum_tmp=0;
179  for (int i=0; i<out_sum; i++) out_sum_tmp=out_map[i]>out_sum_tmp?out_map[i]:out_sum_tmp;
180  out_sum=out_sum_tmp+1;
181  }
182  if ( (out_buf=new T[stride*out_sum]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
183  }
184 
185  // call impl
186  detail::all_reduce_impl(comm,op,in_values,in_n,in_map,out_buf,out_map,stride);
187  return out_buf;
188 }
189 
191 
207 template<typename T, typename Op>
208 inline void
209 all_reduce(const Communicator& comm, const Op& op, const std::vector<T>& in_values, const std::vector<int>& in_map, std::vector<T>& out_values, const std::vector<int>& out_map, const int stride=1)
210 {
211  // set out_values's sizes
212  cf3_assert( in_values.size() % stride == 0 );
213 
214  // resize out_values if vector size is zero
215  if (out_values.size() == 0 ){
216  int out_sum=in_map.size();
217  if (out_map.size()!=0) {
218  boost_foreach( int i, out_map ) out_sum=i>out_sum?i:out_sum;
219  }
220  out_values.resize(stride*out_sum);
221  out_values.reserve(stride*out_sum);
222  }
223 
224  // call impl
225  detail::all_reduce_impl(comm, op, (T*)(&in_values[0]), in_map.size(), (in_map.empty() ? nullptr : &in_map[0]), (T*)(&out_values[0]), (out_map.empty() ? nullptr : &out_map[0]), stride);
226 }
227 
229 
230  } // end namespace PE
231  } // end namespace common
232 } // end namespace cf3
233 
235 
236 #endif // cf3_common_PE_all_reduce_hpp
void all_reduce_impl(const Communicator &comm, Op, const T *in_values, const int in_n, const int *in_map, T *out_values, const int *out_map, const int stride)
Definition: all_reduce.hpp:62
static Operation op()
Definition: operations.hpp:65
#define cf3_assert(a)
Definition: Assertions.hpp:93
MPI_Datatype Datatype
datatype
Definition: types.hpp:47
Datatype get_mpi_datatype(const T &ref_of_type)
ACCESS AND REGISTRATION MECHANISM.
Definition: datatype.hpp:49
#define boost_foreach
lowercase version of BOOST_FOREACH
Definition: Foreach.hpp:16
Top-level namespace for coolfluid.
Definition: Action.cpp:18
MPI_Comm Communicator
communicator
Definition: types.hpp:41
T * all_reduce(const Communicator &comm, const Op &op, const T *in_values, const int in_n, T *out_values, const int stride=1)
Definition: all_reduce.hpp:117
#define MPI_CHECK_RESULT(MPIFunc, Args)
Macro for checking return values of any mpi calls and throws exception on error.
Definition: types.hpp:20
MPI_Op Operation
operation (mostly for reduce and all_reduce)
Definition: types.hpp:44
#define FromHere()
Send comments to:
COOLFluiD Web Admin