Muster
 All Classes Namespaces Files Functions Variables Typedefs Macros
multi_gather.h
Go to the documentation of this file.
1 //////////////////////////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2010, Lawrence Livermore National Security, LLC.
3 // Produced at the Lawrence Livermore National Laboratory
4 // LLNL-CODE-433662
5 // All rights reserved.
6 //
7 // This file is part of Muster. For details, see http://github.com/tgamblin/muster.
8 // Please also read the LICENSE file for further information.
9 //
10 // Redistribution and use in source and binary forms, with or without modification, are
11 // permitted provided that the following conditions are met:
12 //
13 // * Redistributions of source code must retain the above copyright notice, this list of
14 // conditions and the disclaimer below.
15 // * Redistributions in binary form must reproduce the above copyright notice, this list of
16 // conditions and the disclaimer (as noted below) in the documentation and/or other materials
17 // provided with the distribution.
18 // * Neither the name of the LLNS/LLNL nor the names of its contributors may be used to endorse
19 // or promote products derived from this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS
22 // OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
23 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
24 // LAWRENCE LIVERMORE NATIONAL SECURITY, LLC, THE U.S. DEPARTMENT OF ENERGY OR CONTRIBUTORS BE
25 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 // ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 //////////////////////////////////////////////////////////////////////////////////////////////////
31 
32 ///
33 /// @file multi_gather.h
34 /// @author Todd Gamblin tgamblin@llnl.gov
35 /// @brief Asynchronous, some-to-some gather operation used by parallel clustering algorithms
36 /// to simultaneously send members of sample sets to a set of distributed worker processes.
37 ///
38 #ifndef MULTI_GATHER_H
39 #define MULTI_GATHER_H
40 
41 #include <mpi.h>
42 #include <vector>
43 #include <iostream>
44 #include "mpi_bindings.h"
45 #include <algorithm>
46 
47 namespace cluster {
48 
49  ///
50  /// Asynchronous, some-to-some gather operation used by parallel clustering algorithms
51  /// to simultaneously send members of sample sets to a set of distributed worker processes.
52  ///
53  /// @tparam T Type of objects to be transferred by this multi_gather.
54  /// Must support the following operations:
55  /// - <code>int packed_size(MPI_Comm comm) const</code>
56  /// - <code>void pack(void *buf, int bufsize, int *position, MPI_Comm comm) const</code>
57  /// - <code>static T unpack(void *buf, int bufsize, int *position, MPI_Comm comm)</code>
58  ///
59  /// @see par_kmedoids::run_pam_trials(), which uses this class.
60  ///
61  template <class T>
62  class multi_gather {
63  /// internal struct for buffering sends and recvs.
64  struct buffer {
65  int size; ///< buffer for size of Isend or Irecv
66  char *buf; ///< buffer for data to be sent/recv'd
67  std::vector<T> *dest; ///< vector to push unpacked data onto
68 
69  /// constructor for receive buffers
70  buffer(std::vector<T>& _dest)
71  : size(0), buf(NULL), dest(&_dest) { }
72 
73  /// constructor for send buffers
74  buffer(int _size)
75  : size(_size), buf(new char[_size]), dest(NULL) { }
76 
77  /// Destructor
78  ~buffer() {
79  if (buf) delete [] buf;
80  }
81 
82  /// Turn a send buffer into a receive buffer (for local "sends")
83  void set_destination(std::vector<T>& _dest) { dest = &_dest; }
84 
85  /// Whether buffer has been allocated.
86  bool is_allocated() { return buf; }
87 
88  /// Allocates a buffer of <size> chars, to be called after size is received.
89  void allocate() { buf = new char[size]; }
90 
91  /// This is a buffer for a send if true. It's a buffer for a recv if false.
92  bool is_send() { return !dest; }
93  };
94 
95  MPI_Comm comm; ///< Communicator on which gather takes place
96  int tag; ///< tag for communication in multi_gathers.
97 
98  std::vector<MPI_Request> reqs; ///< Oustanding requests to be completed.
99  std::vector<buffer*> buffers; ///< Send and receive buffers for packed data in gathers.
100  size_t unfinished_reqs; ///< Number of still outstanding requests
101 
102  public:
103  ///
104  /// Construct a mult_gather on a communicator. MPI communication will use
105  /// the specified tag.
106  ///
107  multi_gather(MPI_Comm _comm, int _tag=0)
108  : comm(_comm), tag(_tag), unfinished_reqs(0) { }
109 
110  ///
111  /// Starts initial send and receive requests for this gather. Must be followed up with a call to finish().
112  ///
113  template <class ObjIterator, class RankIterator>
114  void start(ObjIterator begin_obj, ObjIterator end_obj,
115  RankIterator begin_src, RankIterator end_src,
116  std::vector<T>& dest, int root)
117  {
118  int size, rank;
119  CMPI_Comm_size(comm, &size);
120  CMPI_Comm_rank(comm, &rank);
121 
122  // stop if this rank isn't a member of the gather.
123  if (rank != root && find(begin_src, end_src, rank) == end_src) return;
124 
125  // determine size of local data.
126  int packed_size = cmpi_packed_size(1, MPI_SIZE_T, comm); // num objects
127  for (ObjIterator o=begin_obj; o != end_obj; o++) { // size of each object
128  packed_size += o->packed_size(comm);
129  }
130 
131  buffer *send_buffer = new buffer(packed_size);
132  if (rank != root) {
133  buffers.push_back(NULL); // no separate buffer for the size.
134  reqs.push_back(MPI_REQUEST_NULL);
135  CMPI_Isend(&send_buffer->size, 1, MPI_INT, root, tag, comm, &reqs.back());
136  unfinished_reqs++;
137  }
138 
139  // pack up local data into the buffer
140  int pos = 0;
141  size_t num_objects = distance(begin_obj, end_obj);
142  CMPI_Pack(&num_objects, 1, MPI_SIZE_T, send_buffer->buf, send_buffer->size, &pos, comm);
143  for (ObjIterator o=begin_obj; o != end_obj; o++) {
144  o->pack(send_buffer->buf, packed_size, &pos, comm);
145  }
146 
147  if (rank != root) {
148  // send packed data along to destination.
149  buffers.push_back(send_buffer); // buffer data during send
150  reqs.push_back(MPI_REQUEST_NULL);
151  CMPI_Isend(send_buffer->buf, packed_size, MPI_PACKED, root, tag, comm, &reqs.back());
152  unfinished_reqs++;
153 
154  } else { // rank is root; do receives instead
155  // initiate all the receives for sizes
156  for (RankIterator src=begin_src; src != end_src; src++) {
157  if (*src == root) {
158  // for the root, just insert the local packed buffer onto the array of buffers.
159  // don't increment unfinished reqs here b/c local buffer is already "done."
160  send_buffer->set_destination(dest);
161  buffers.push_back(send_buffer);
162  reqs.push_back(MPI_REQUEST_NULL);
163 
164  } else {
165  // make some buffer space for the receive, record its eventual destination
166  buffers.push_back(new buffer(dest));
167  reqs.push_back(MPI_REQUEST_NULL);
168  CMPI_Irecv(&buffers.back()->size, 1, MPI_INT, *src, tag, comm, &reqs.back());
169  unfinished_reqs++;
170  }
171  }
172  }
173  }
174 
175  ///
176  /// Starts a gather with one object instead of a range of objects.
177  ///
178  template <class RankIterator>
179  void start(const T& obj, RankIterator begin_src, RankIterator end_src, std::vector<T>& dest, int root) {
180  start(&obj, (&obj) + 1, begin_src, end_src, dest, root);
181  }
182 
183  void finish() {
184  int rank;
185  CMPI_Comm_rank(MPI_COMM_WORLD, &rank);
186 
187  while (unfinished_reqs) {
188  int outcount;
189  std::vector<int> indices(reqs.size());
190  std::vector<MPI_Status> status(reqs.size());
191 
192  CMPI_Waitsome(reqs.size(), &reqs[0], &outcount, &indices[0], &status[0]);
193  for (int o=0; o < outcount; o++) {
194  const int r = indices[o]; // index of received object.
195 
196  if (buffers[r] && !buffers[r]->is_send() && !buffers[r]->is_allocated()) {
197  // buffers[r] is a recv and we just received packed size. Allocate space and recv data.
198  int src = status[o].MPI_SOURCE;
199  buffers[r]->allocate();
200  CMPI_Irecv(buffers[r]->buf, buffers[r]->size, MPI_PACKED, src, tag, comm, &reqs[r]);
201 
202  } else {
203  // buffers[r] is a send, or it's a receive and we just received full packed data.
204  // in either case, the buffer is done, so decrement the number of unfinished reqs.
205  unfinished_reqs--;
206  }
207  }
208  }
209 
210  // Unpack all the received buffers into their destination vectors. This preserves order
211  // as unpacked data are only pushed onto the backs of destination vectors *after* everything
212  // is received. Buffers are still received in any order above, though.
213  for (size_t i=0; i < buffers.size(); i++) {
214  if (!buffers[i]) continue;
215 
216  if (!buffers[i]->is_send()) {
217  int pos = 0;
218  size_t num_objects;
219 
220  CMPI_Unpack(buffers[i]->buf, buffers[i]->size, &pos, &num_objects, 1, MPI_SIZE_T, comm);
221  for (size_t o=0; o < num_objects; o++) {
222  buffers[i]->dest->push_back(T::unpack(buffers[i]->buf, buffers[i]->size, &pos, comm));
223  }
224  }
225  delete buffers[i];
226  }
227 
228  // clear these out before the next call to start()
229  buffers.clear();
230  reqs.clear();
231  }
232 
233  }; // class multi_gather
234 
235 } // namespace cluster
236 
237 
238 #endif // MULTI_GATHER_H
239 
void start(ObjIterator begin_obj, ObjIterator end_obj, RankIterator begin_src, RankIterator end_src, std::vector< T > &dest, int root)
Starts initial send and receive requests for this gather.
Definition: multi_gather.h:114
#define MPI_SIZE_T
Definition: mpi_utils.h:39
#define CMPI_Isend
Definition: mpi_bindings.h:88
#define CMPI_Pack
Definition: mpi_bindings.h:89
Asynchronous, some-to-some gather operation used by parallel clustering algorithms to simultaneously ...
Definition: multi_gather.h:62
#define cmpi_packed_size
Definition: mpi_bindings.h:100
#define CMPI_Comm_rank
Definition: mpi_bindings.h:81
multi_gather(MPI_Comm _comm, int _tag=0)
Construct a mult_gather on a communicator.
Definition: multi_gather.h:107
#define CMPI_Waitsome
Definition: mpi_bindings.h:93
#define CMPI_Irecv
Definition: mpi_bindings.h:87
#define CMPI_Unpack
Definition: mpi_bindings.h:92
void start(const T &obj, RankIterator begin_src, RankIterator end_src, std::vector< T > &dest, int root)
Starts a gather with one object instead of a range of objects.
Definition: multi_gather.h:179
#defines for switching between MPI and PMPI bindings.
#define CMPI_Comm_size
Definition: mpi_bindings.h:82
Muster. Copyright © 2010, Lawrence Livermore National Laboratory, LLNL-CODE-433662.
Distribution of Muster and its documentation is subject to terms of the Muster LICENSE.
Generated on Thu Sep 1 2016 using Doxygen 1.8.5