COOLFluiD  Release kernel
COOLFluiD is a Collaborative Simulation Environment (CSE) focused on complex MultiPhysics simulations.
broadcast.hpp
Go to the documentation of this file.
1 // Copyright (C) 2010-2013 von Karman Institute for Fluid Dynamics, Belgium
2 //
3 // This software is distributed under the terms of the
4 // GNU Lesser General Public License version 3 (LGPLv3).
5 // See doc/lgpl.txt and doc/gpl.txt for the license text.
6 
7 #ifndef cf3_common_PE_broadcast_hpp
8 #define cf3_common_PE_broadcast_hpp
9 
11 
12 #include "common/Foreach.hpp"
14 
15 #include "common/PE/types.hpp"
16 #include "common/PE/datatype.hpp"
17 
18 // #include "common/PE/debug.hpp" // for debugging mpi
19 
21 
32 
34 namespace cf3 {
35  namespace common {
36  namespace PE {
37 
39 
40 namespace detail {
41 
43 
57  template<typename T>
58  inline void
59  broadcast_impl(const Communicator& comm, const T* in_values, const int in_n, const int *in_map, T* out_values, const int *out_map, const int root, const int stride)
60  {
61  // get rank
62  int irank;
63  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
64 
65  // get data type, op and some checkings
66  Datatype type = get_mpi_datatype(*in_values);
67  cf3_assert( stride>0 );
68 
69  // there is in_map
70  T *inout_buf=(T*)out_values;
71  if (((in_map!=0)&&(irank==root))||(out_map!=0)){
72  if ( (inout_buf=new T[stride*in_n+1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer."); // +1 for avoiding possible zero allocation
73  if (irank==root) {
74  if (stride==1) { for(int i=0; i<in_n; i++) inout_buf[i]=in_values[in_map[i]]; }
75  else { for(int i=0; i<in_n; i++) memcpy(&inout_buf[stride*i],&in_values[stride*in_map[i]],stride*sizeof(T)); }
76  }
77  } else if ((irank==root)&&(in_values!=out_values)) {
78  memcpy(inout_buf,in_values,stride*in_n*sizeof(T));
79  }
80 
81  // do the communication
82  MPI_CHECK_RESULT(MPI_Bcast, ( inout_buf, in_n*stride, type, root, comm ));
83 
84  // re-populate out_values
85  if (out_map!=0) {
86  if (stride==1) { for(int i=0; i<in_n; i++) out_values[out_map[i]]=inout_buf[i]; }
87  else { for(int i=0; i<in_n; i++) memcpy(&out_values[stride*out_map[i]],&inout_buf[stride*i],stride*sizeof(T)); }
88  }
89 
90  // free internal memory
91  if (((in_map!=0)&&(irank==root))||(out_map!=0)) delete[] inout_buf;
92 }
93 
95 
96 } // end namespace detail
97 
99 
109 template<typename T>
110 inline T*
111 broadcast(const Communicator& comm, const T* in_values, const int in_n, T* out_values, const int root, const int stride=1)
112 {
113  // get rank
114  int irank;
115  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
116 
117  // allocate out_buf if incoming pointer is null
118  T* out_buf=out_values;
119  int size=in_n;
120  if (out_values==0) {
121  detail::broadcast_impl(comm,&size,1,(int*)0,&size,(int*)0,root,1);
122  if ( (out_buf=new T[stride*size>1?stride*size:1]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
123  }
124 
125  // call impl
126  if (irank==root) {
127  detail::broadcast_impl(comm,in_values,size,(int*)0,out_buf,(int*)0,root,stride);
128  } else {
129  detail::broadcast_impl(comm,(T*)0,size,(int*)0,out_buf,(int*)0,root,stride);
130  }
131 
132  return out_buf;
133 }
134 
136 
144 template<typename T>
145 inline void
146 broadcast(const Communicator& comm, const std::vector<T>& in_values, std::vector<T>& out_values, const int root, const int stride=1)
147 {
148  // get rank
149  int irank;
150  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
151 
152  // set out_values's sizes
153  int size=out_values.size();
154  if (out_values.size()==0)
155  {
156  if (irank==root) size=(int)in_values.size();
157  detail::broadcast_impl(comm,&size,1,(int*)0,&size,(int*)0,root,1);
158  }
159  else
160  {
161  // In debug modus, assert that the given out_values size matches what it is supposed to receive
162 #ifdef NDEBUG
163  if (irank==root) size=(int)in_values.size();
164  detail::broadcast_impl(comm,&size,1,(int*)0,&size,(int*)0,root,1);
165  cf3_assert_desc("out_values.size() must be set to zero, or be compatible",size == out_values.size());
166 #endif
167  }
168  cf3_assert( in_values.size() % stride == 0 );
169  out_values.resize(size);
170  out_values.reserve(size);
171 
172  if (irank==root) {
173  detail::broadcast_impl(comm,(T*)(&in_values[0]),size/stride,(int*)0,(T*)(&out_values[0]),(int*)0,root,stride);
174  } else {
175  detail::broadcast_impl(comm,(T*)0,size/stride,(int*)0,(T*)(&out_values[0]),(int*)0,root,stride);
176  }
177 
178 }
179 
181 
196 template<typename T>
197 inline T*
198 broadcast(const Communicator& comm, const T* in_values, const int in_n, const int *in_map, T* out_values, const int *out_map, const int root, const int stride=1)
199 {
200  // get rank
201  int irank;
202  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
203 
204  // allocate out_buf if incoming pointer is null
205  T* out_buf=out_values;
206  if (out_values==0) {
207  int out_sum=in_n;
208  if (out_map!=0){
209  int out_sum_tmp=0;
210  for (int i=0; i<out_sum; i++) out_sum_tmp=out_map[i]>out_sum_tmp?out_map[i]:out_sum_tmp;
211  if (out_sum==0) out_sum=1;
212  } else {
213  detail::broadcast_impl(comm,&out_sum,1,(int*)0,&out_sum,(int*)0,root,1);
214  if (out_sum==0) out_sum=1;
215  }
216  if ( (out_buf=new T[stride*out_sum]) == (T*)0 ) throw cf3::common::NotEnoughMemory(FromHere(),"Could not allocate temporary buffer.");
217  }
218 
219  // call impl
220  if (irank==root){
221  detail::broadcast_impl(comm,in_values,in_n,in_map,out_buf,out_map,root,stride);
222  } else {
223  detail::broadcast_impl(comm,(T*)0,in_n,(int*)0,out_buf,out_map,root,stride);
224  }
225  return out_buf;
226 }
227 
229 
245 template<typename T>
246 inline void
247 broadcast(const Communicator& comm, const std::vector<T>& in_values, const std::vector<int>& in_map, std::vector<T>& out_values, const std::vector<int>& out_map, const int root, const int stride=1)
248 {
249  // get rank
250  int irank;
251  MPI_CHECK_RESULT(MPI_Comm_rank,(comm,&irank));
252 
253  // set out_values's sizes
254  cf3_assert( in_values.size() % stride == 0 );
255 
256  // resize out_values if vector size is zero
257  if (out_values.size() == 0 ){
258  int out_sum=in_map.size();
259  if (out_map.size()!=0) {
260  boost_foreach( int i, out_map ) out_sum=i>out_sum?i:out_sum;
261  } else {
262  detail::broadcast_impl(comm,&out_sum,1,(int*)0,&out_sum,(int*)0,root,1);
263  if (out_sum==0) out_sum=1;
264  }
265  out_values.resize(stride*out_sum);
266  out_values.reserve(stride*out_sum);
267  }
268 
269  // call impl
270  if (irank==root){
271  detail::broadcast_impl(comm, (T*)(&in_values[0]), in_map.size(), (in_map.empty() ? nullptr : &in_map[0]), (T*)(&out_values[0]), (out_map.empty() ? nullptr : &out_map[0]), root, stride);
272  } else {
273  detail::broadcast_impl(comm, (T*)0, in_map.size(), (int*)0, (T*)(&out_values[0]), (out_map.empty() ? nullptr : &out_map[0]), root, stride);
274  }
275 }
276 
278 
279  } // end namespace PE
280  } // end namespace common
281 } // end namespace cf3
282 
284 
285 #endif // cf3_common_PE_broadcast_hpp
#define cf3_assert(a)
Definition: Assertions.hpp:93
MPI_Datatype Datatype
datatype
Definition: types.hpp:47
Datatype get_mpi_datatype(const T &ref_of_type)
ACCESS AND REGISTRATION MECHANISM.
Definition: datatype.hpp:49
#define boost_foreach
lowercase version of BOOST_FOREACH
Definition: Foreach.hpp:16
tuple root
Definition: coolfluid.py:24
#define cf3_assert_desc(m, a)
Definition: Assertions.hpp:94
Top-level namespace for coolfluid.
Definition: Action.cpp:18
T * broadcast(const Communicator &comm, const T *in_values, const int in_n, T *out_values, const int root, const int stride=1)
Definition: broadcast.hpp:111
MPI_Comm Communicator
communicator
Definition: types.hpp:41
#define MPI_CHECK_RESULT(MPIFunc, Args)
Macro for checking return values of any mpi calls and throws exception on error.
Definition: types.hpp:20
void broadcast_impl(const Communicator &comm, const T *in_values, const int in_n, const int *in_map, T *out_values, const int *out_map, const int root, const int stride)
Definition: broadcast.hpp:59
#define FromHere()
Send comments to:
COOLFluiD Web Admin