38 #ifndef MULTI_GATHER_H
39 #define MULTI_GATHER_H
70 buffer(std::vector<T>& _dest)
71 : size(0), buf(NULL), dest(&_dest) { }
75 : size(_size), buf(
new char[_size]), dest(NULL) { }
79 if (buf)
delete [] buf;
83 void set_destination(std::vector<T>& _dest) { dest = &_dest; }
86 bool is_allocated() {
return buf; }
89 void allocate() { buf =
new char[size]; }
92 bool is_send() {
return !dest; }
98 std::vector<MPI_Request> reqs;
99 std::vector<buffer*> buffers;
100 size_t unfinished_reqs;
108 : comm(_comm), tag(_tag), unfinished_reqs(0) { }
113 template <
class ObjIterator,
class RankIterator>
114 void start(ObjIterator begin_obj, ObjIterator end_obj,
115 RankIterator begin_src, RankIterator end_src,
116 std::vector<T>& dest,
int root)
123 if (rank != root && find(begin_src, end_src, rank) == end_src)
return;
127 for (ObjIterator o=begin_obj; o != end_obj; o++) {
128 packed_size += o->packed_size(comm);
131 buffer *send_buffer =
new buffer(packed_size);
133 buffers.push_back(NULL);
134 reqs.push_back(MPI_REQUEST_NULL);
135 CMPI_Isend(&send_buffer->size, 1, MPI_INT, root, tag, comm, &reqs.back());
141 size_t num_objects = distance(begin_obj, end_obj);
143 for (ObjIterator o=begin_obj; o != end_obj; o++) {
144 o->pack(send_buffer->buf, packed_size, &pos, comm);
149 buffers.push_back(send_buffer);
150 reqs.push_back(MPI_REQUEST_NULL);
151 CMPI_Isend(send_buffer->buf, packed_size, MPI_PACKED, root, tag, comm, &reqs.back());
156 for (RankIterator src=begin_src; src != end_src; src++) {
160 send_buffer->set_destination(dest);
161 buffers.push_back(send_buffer);
162 reqs.push_back(MPI_REQUEST_NULL);
166 buffers.push_back(
new buffer(dest));
167 reqs.push_back(MPI_REQUEST_NULL);
168 CMPI_Irecv(&buffers.back()->size, 1, MPI_INT, *src, tag, comm, &reqs.back());
178 template <
class RankIterator>
179 void start(
const T& obj, RankIterator begin_src, RankIterator end_src, std::vector<T>& dest,
int root) {
180 start(&obj, (&obj) + 1, begin_src, end_src, dest, root);
187 while (unfinished_reqs) {
189 std::vector<int> indices(reqs.size());
190 std::vector<MPI_Status> status(reqs.size());
192 CMPI_Waitsome(reqs.size(), &reqs[0], &outcount, &indices[0], &status[0]);
193 for (
int o=0; o < outcount; o++) {
194 const int r = indices[o];
196 if (buffers[r] && !buffers[r]->is_send() && !buffers[r]->is_allocated()) {
198 int src = status[o].MPI_SOURCE;
199 buffers[r]->allocate();
200 CMPI_Irecv(buffers[r]->buf, buffers[r]->size, MPI_PACKED, src, tag, comm, &reqs[r]);
213 for (
size_t i=0; i < buffers.size(); i++) {
214 if (!buffers[i])
continue;
216 if (!buffers[i]->is_send()) {
221 for (
size_t o=0; o < num_objects; o++) {
222 buffers[i]->dest->push_back(T::unpack(buffers[i]->buf, buffers[i]->size, &pos, comm));
238 #endif // MULTI_GATHER_H
void start(ObjIterator begin_obj, ObjIterator end_obj, RankIterator begin_src, RankIterator end_src, std::vector< T > &dest, int root)
Starts initial send and receive requests for this gather.
Asynchronous, some-to-some gather operation used by parallel clustering algorithms to simultaneously ...
multi_gather(MPI_Comm _comm, int _tag=0)
Construct a mult_gather on a communicator.
void start(const T &obj, RankIterator begin_src, RankIterator end_src, std::vector< T > &dest, int root)
Starts a gather with one object instead of a range of objects.
#defines for switching between MPI and PMPI bindings.