00001
00002
00003 #ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
00004 #define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
00005
00006 #if HAVE_MPI
00007
00008 #include <mpi.h>
00009 #include <vector>
00010 #include <map>
00011 #include <functional>
00012 #include <dune/common/unused.hh>
00013 #include "interface.hh"
00014 #include "mpitraits.hh"
00015
00028 namespace Dune
00029 {
00030
00031 namespace
00032 {
00037 template<class T, class Allocator=std::allocator<T> >
00038 class MessageBuffer
00039 {
00040 public:
00045 explicit MessageBuffer(int size)
00046 : buffer_(new T[size]), size_(size), position_(0)
00047 {}
00052 explicit MessageBuffer(const MessageBuffer& o)
00053 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
00054 {
00055 }
00057 ~MessageBuffer()
00058 {
00059 delete[] buffer_;
00060 }
00065 void write(const T& data)
00066 {
00067 buffer_[position_++]=data;
00068 }
00069
00074 void read(T& data)
00075 {
00076 data=buffer_[position_++];
00077 }
00078
00084 void reset()
00085 {
00086 position_=0;
00087 }
00088
00093 bool finished()
00094 {
00095 return position_==size_;
00096 }
00097
00103 bool hasSpaceForItems(int noItems)
00104 {
00105 return position_+noItems<=size_;
00106 }
00111 std::size_t size() const
00112 {
00113 return size_;
00114 }
00119 operator T*()
00120 {
00121 return buffer_;
00122 }
00123
00124 private:
00128 T* buffer_;
00132 std::size_t size_;
00136 std::size_t position_;
00137 };
00138
00142 class InterfaceTracker
00143 {
00144 public:
00150 InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
00151 bool allocateSizes=false)
00152 : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_(),
00153 sizesAllocated_(allocateSizes)
00154 {
00155 if(allocateSizes)
00156 {
00157 sizes_.resize(info.size());
00158 }
00159 }
00160
00164 void moveToNextIndex()
00165 {
00166 index_++;
00167 assert(index_<=interface_.size());
00168 skipZeroIndices();
00169 }
00174 void increment(std::size_t i)
00175 {
00176 index_+=i;
00177 assert(index_<=interface_.size());
00178 }
00183 bool finished() const
00184 {
00185 return index_==interface_.size();
00186 }
00187
00188 void skipZeroIndices()
00189 {
00190
00191 while(sizes_.size() && index_!=interface_.size() &&!size())
00192 ++index_;
00193 }
00194
00199 std::size_t index() const
00200 {
00201 return interface_[index_];
00202 }
00206 std::size_t size() const
00207 {
00208 assert(sizes_.size());
00209 return sizes_[index_];
00210 }
00214 std::size_t* getSizesPointer()
00215 {
00216 return &sizes_[0];
00217 }
00222 bool empty() const
00223 {
00224 return !interface_.size();
00225 }
00226
00231 std::size_t indicesLeft() const
00232 {
00233 return interface_.size()-index_;
00234 }
00238 std::size_t fixedSize;
00242 int rank() const
00243 {
00244 return rank_;
00245 }
00249 std::size_t offset() const
00250 {
00251 return index_;
00252 }
00253 private:
00255 int rank_;
00257 std::size_t index_;
00259 InterfaceInformation interface_;
00260 std::vector<std::size_t> sizes_;
00261 bool sizesAllocated_;
00262 };
00263
00264
00265 }
00266
00278 template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
00279 class VariableSizeCommunicator
00280 {
00281 public:
00286 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
00287 std::less<int>,
00288 typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other> InterfaceMap;
00289
00290 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
00291
00297 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
00298 : maxBufferSize_(32768), interface_(&inf)
00299 {
00300 MPI_Comm_dup(comm, &communicator_);
00301 }
00306 VariableSizeCommunicator(const Interface& inf)
00307 : maxBufferSize_(32768), interface_(&inf.interfaces())
00308 {
00309 MPI_Comm_dup(inf.communicator(), &communicator_);
00310 }
00311 #else
00312
00318 VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
00319 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
00320 interface_(&inf)
00321 {
00322 MPI_Comm_dup(comm, &communicator_);
00323 }
00328 VariableSizeCommunicator(const Interface& inf)
00329 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
00330 interface_(&inf.interfaces())
00331 {
00332 MPI_Comm_dup(inf.communicator(), &communicator_);
00333 }
00334 #endif
00335
00341 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
00342 : maxBufferSize_(max_buffer_size), interface_(&inf)
00343 {
00344 MPI_Comm_dup(comm, &communicator_);
00345 }
00346
00352 VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
00353 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
00354 {
00355 MPI_Comm_dup(inf.communicator(), &communicator_);
00356 }
00357
00358 ~VariableSizeCommunicator()
00359 {
00360 MPI_Comm_free(&communicator_);
00361 }
00362
00363
00383 template<class DataHandle>
00384 void forward(DataHandle& handle)
00385 {
00386 communicate<true>(handle);
00387 }
00388
00408 template<class DataHandle>
00409 void backward(DataHandle& handle)
00410 {
00411 communicate<false>(handle);
00412 }
00413
00414 private:
00415 template<bool FORWARD, class DataHandle>
00416 void communicateSizes(DataHandle& handle,
00417 std::vector<InterfaceTracker>& recv_trackers);
00418
00425 template<bool forward,class DataHandle>
00426 void communicate(DataHandle& handle);
00436 template<bool FORWARD, class DataHandle>
00437 void setupInterfaceTrackers(DataHandle& handle,
00438 std::vector<InterfaceTracker>& send_trackers,
00439 std::vector<InterfaceTracker>& recv_trackers);
00447 template<bool FORWARD, class DataHandle>
00448 void communicateFixedSize(DataHandle& handle);
00456 template<bool FORWARD, class DataHandle>
00457 void communicateVariableSize(DataHandle& handle);
00464 std::size_t maxBufferSize_;
00472 const InterfaceMap* interface_;
00478 MPI_Comm communicator_;
00479 };
00480
00482 namespace
00483 {
00487 template<class DataHandle>
00488 class SizeDataHandle
00489 {
00490 public:
00491 typedef std::size_t DataType;
00492
00493 SizeDataHandle(DataHandle& data,
00494 std::vector<InterfaceTracker>& trackers)
00495 : data_(data), trackers_(trackers), index_()
00496 {}
00497 bool fixedsize()
00498 {
00499 return true;
00500 }
00501 std::size_t size(std::size_t i)
00502 {
00503 DUNE_UNUSED_PARAMETER(i);
00504 return 1;
00505 }
00506 template<class B>
00507 void gather(B& buf, int i)
00508 {
00509 buf.write(data_.size(i));
00510 }
00511 void setReceivingIndex(std::size_t i)
00512 {
00513 index_=i;
00514 }
00515 std::size_t* getSizesPointer()
00516 {
00517 return trackers_[index_].getSizesPointer();
00518 }
00519
00520 private:
00521 DataHandle& data_;
00522 std::vector<InterfaceTracker>& trackers_;
00523 int index_;
00524 };
00525
00526 template<class T>
00527 void setReceivingIndex(T&, int)
00528 {}
00529
00530 template<class T>
00531 void setReceivingIndex(SizeDataHandle<T>& t, int i)
00532 {
00533 t.setReceivingIndex(i);
00534 }
00535
00536
00542 template<bool FORWARD>
00543 struct InterfaceInformationChooser
00544 {
00548 static const InterfaceInformation&
00549 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
00550 {
00551 return info.first;
00552 }
00553
00557 static const InterfaceInformation&
00558 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
00559 {
00560 return info.second;
00561 }
00562 };
00563
00564 template<>
00565 struct InterfaceInformationChooser<false>
00566 {
00567 static const InterfaceInformation&
00568 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
00569 {
00570 return info.second;
00571 }
00572
00573 static const InterfaceInformation&
00574 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
00575 {
00576 return info.first;
00577 }
00578 };
00579
00585 template<class DataHandle>
00586 struct PackEntries
00587 {
00588
00589 int operator()(DataHandle& handle, InterfaceTracker& tracker,
00590 MessageBuffer<typename DataHandle::DataType>& buffer,
00591 int i) const
00592 {
00593 return operator()(handle,tracker,buffer);
00594 }
00595
00603 int operator()(DataHandle& handle, InterfaceTracker& tracker,
00604 MessageBuffer<typename DataHandle::DataType>& buffer) const
00605 {
00606 if(tracker.fixedSize)
00607 {
00608
00609 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
00610 for(std::size_t i=0; i< noIndices; ++i)
00611 {
00612 handle.gather(buffer, tracker.index());
00613 tracker.moveToNextIndex();
00614 }
00615 return noIndices*tracker.fixedSize;
00616 }
00617 else
00618 {
00619 int packed=0;
00620 tracker.skipZeroIndices();
00621 while(!tracker.finished())
00622 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
00623 {
00624 handle.gather(buffer, tracker.index());
00625 packed+=handle.size(tracker.index());
00626 tracker.moveToNextIndex();
00627 }
00628 else
00629 break;
00630 assert(packed);
00631 return packed;
00632 }
00633 }
00634 };
00635
00641 template<class DataHandle>
00642 struct UnpackEntries{
00643
00651 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
00652 MessageBuffer<typename DataHandle::DataType>& buffer,
00653 int count=0)
00654 {
00655 if(tracker.fixedSize)
00656 {
00657 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
00658
00659 for(std::size_t i=0; i< noIndices; ++i)
00660 {
00661 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
00662 tracker.moveToNextIndex();
00663 }
00664 return tracker.finished();
00665 }
00666 else
00667 {
00668 assert(count);
00669 for(int unpacked=0;unpacked<count;)
00670 {
00671 assert(!tracker.finished());
00672 assert(buffer.hasSpaceForItems(tracker.size()));
00673 handle.scatter(buffer, tracker.index(), tracker.size());
00674 unpacked+=tracker.size();
00675 tracker.moveToNextIndex();
00676 }
00677 return tracker.finished();
00678 }
00679 }
00680 };
00681
00682
00686 template<class DataHandle>
00687 struct UnpackSizeEntries{
00688
00696 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
00697 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
00698 {
00699 std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
00700 std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
00701 handle.getSizesPointer()+tracker.offset());
00702 tracker.increment(noIndices);
00703 return noIndices;
00704 }
00705 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
00706 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
00707 {
00708 return operator()(handle,tracker,buffer);
00709 }
00710 };
00711
00719 void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
00720 std::vector<MPI_Request>& send_requests,
00721 std::vector<InterfaceTracker>& recv_trackers,
00722 std::vector<MPI_Request>& recv_requests,
00723 MPI_Comm communicator)
00724 {
00725 typedef std::vector<InterfaceTracker>::iterator TIter;
00726 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
00727
00728 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
00729 ++iter, ++mIter)
00730 {
00731 MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
00732 iter->rank(), 933881, communicator, &(*mIter));
00733 }
00734
00735
00736 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
00737 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
00738 iter!=end;
00739 ++iter, ++mIter1)
00740 {
00741 MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
00742 iter->rank(), 933881, communicator, &(*mIter1));
00743 }
00744 }
00745
00746
00751 template<class DataHandle>
00752 struct SetupSendRequest{
00753 void operator()(DataHandle& handle,
00754 InterfaceTracker& tracker,
00755 MessageBuffer<typename DataHandle::DataType>& buffer,
00756 MPI_Request& request,
00757 MPI_Comm comm) const
00758 {
00759 buffer.reset();
00760 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
00761
00762 while(!tracker.finished() && !handle.size(tracker.index()))
00763 tracker.moveToNextIndex();
00764 if(size)
00765 MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
00766 tracker.rank(), 933399, comm, &request);
00767 }
00768 };
00769
00770
00775 template<class DataHandle>
00776 struct SetupRecvRequest{
00777 void operator()(DataHandle& ,
00778 InterfaceTracker& tracker,
00779 MessageBuffer<typename DataHandle::DataType>& buffer,
00780 MPI_Request& request,
00781 MPI_Comm comm) const
00782 {
00783 buffer.reset();
00784 if(tracker.indicesLeft())
00785 MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
00786 tracker.rank(), 933399, comm, &request);
00787 }
00788 };
00789
00793 template<class DataHandle>
00794 struct NullPackUnpackFunctor
00795 {
00796 int operator()(DataHandle&, InterfaceTracker&,
00797 MessageBuffer<typename DataHandle::DataType>&, int)
00798 {
00799 return 0;
00800 }
00801 int operator()(DataHandle&, InterfaceTracker&,
00802 MessageBuffer<typename DataHandle::DataType>&)
00803 {
00804 return 0;
00805 }
00806 };
00807
00822 template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
00823 std::size_t checkAndContinue(DataHandle& handle,
00824 std::vector<InterfaceTracker>& trackers,
00825 std::vector<MPI_Request>& requests,
00826 std::vector<MPI_Request>& requests2,
00827 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
00828 MPI_Comm comm,
00829 BufferFunctor buffer_func,
00830 CommunicationFunctor comm_func,
00831 bool valid=true,
00832 bool getCount=false)
00833 {
00834 std::size_t size=requests.size();
00835 std::vector<MPI_Status> statuses(size);
00836 int no_completed;
00837 std::vector<int> indices(size, -1);
00838
00839 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
00840 indices.resize(no_completed);
00841 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
00842 index!=end; ++index)
00843 {
00844 InterfaceTracker& tracker=trackers[*index];
00845 setReceivingIndex(handle, *index);
00846 if(getCount)
00847 {
00848
00849 int count;
00850 MPI_Get_count(&(statuses[index-indices.begin()]),
00851 MPITraits<typename DataHandle::DataType>::getType(),
00852 &count);
00853
00854 buffer_func(handle, tracker, buffers[*index], count);
00855 }else
00856 buffer_func(handle, tracker, buffers[*index]);
00857 tracker.skipZeroIndices();
00858 if(!tracker.finished()){
00859
00860 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
00861 tracker.skipZeroIndices();
00862 if(valid)
00863 no_completed-=!tracker.finished();
00864 }
00865 }
00866 return no_completed;
00867
00868 }
00869
00879 template<class DataHandle>
00880 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
00881 std::vector<InterfaceTracker>& trackers,
00882 std::vector<MPI_Request>& size_requests,
00883 std::vector<MPI_Request>& data_requests,
00884 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
00885 MPI_Comm comm)
00886 {
00887 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
00888 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
00889 }
00890
00899 template<class DataHandle>
00900 std::size_t checkSendAndContinueSending(DataHandle& handle,
00901 std::vector<InterfaceTracker>& trackers,
00902 std::vector<MPI_Request>& requests,
00903 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
00904 MPI_Comm comm)
00905 {
00906 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
00907 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
00908 }
00909
00918 template<class DataHandle>
00919 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
00920 std::vector<InterfaceTracker>& trackers,
00921 std::vector<MPI_Request>& requests,
00922 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
00923 MPI_Comm comm)
00924 {
00925 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
00926 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
00927 true, !handle.fixedsize());
00928 }
00929
00930
00931 bool validRecvRequests(const std::vector<MPI_Request> reqs)
00932 {
00933 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
00934 i!=end; ++i)
00935 if(*i!=MPI_REQUEST_NULL)
00936 return true;
00937 return false;
00938 }
00939
00950 template<class DataHandle, class Functor>
00951 std::size_t setupRequests(DataHandle& handle,
00952 std::vector<InterfaceTracker>& trackers,
00953 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
00954 std::vector<MPI_Request>& requests,
00955 const Functor& setupFunctor,
00956 MPI_Comm communicator)
00957 {
00958 typedef typename std::vector<InterfaceTracker>::iterator TIter;
00959 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
00960 biter=buffers.begin();
00961 typename std::vector<MPI_Request>::iterator riter=requests.begin();
00962 std::size_t complete=0;
00963 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
00964 {
00965 setupFunctor(handle, *titer, *biter, *riter, communicator);
00966 complete+=titer->finished();
00967 }
00968 return complete;
00969 }
00970 }
00971
00972 template<class Allocator>
00973 template<bool FORWARD, class DataHandle>
00974 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
00975 std::vector<InterfaceTracker>& send_trackers,
00976 std::vector<InterfaceTracker>& recv_trackers)
00977 {
00978 if(interface_->size()==0)
00979 return;
00980 send_trackers.reserve(interface_->size());
00981 recv_trackers.reserve(interface_->size());
00982
00983 int fixedsize=0;
00984 if(handle.fixedsize())
00985 ++fixedsize;
00986
00987
00988 typedef typename InterfaceMap::const_iterator IIter;
00989 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
00990 {
00991
00992 if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
00993 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
00994 assert(!handle.fixedsize()||fixedsize>0);
00995 send_trackers.push_back(InterfaceTracker(inf->first,
00996 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
00997 recv_trackers.push_back(InterfaceTracker(inf->first,
00998 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
00999 }
01000 }
01001
01002 template<class Allocator>
01003 template<bool FORWARD, class DataHandle>
01004 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
01005 {
01006 std::vector<MPI_Request> size_send_req(interface_->size());
01007 std::vector<MPI_Request> size_recv_req(interface_->size());
01008
01009 std::vector<InterfaceTracker> send_trackers;
01010 std::vector<InterfaceTracker> recv_trackers;
01011 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
01012 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
01013
01014 std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
01015 std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
01016 typedef typename DataHandle::DataType DataType;
01017 std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
01018 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
01019
01020
01021 setupRequests(handle, send_trackers, send_buffers, data_send_req,
01022 SetupSendRequest<DataHandle>(), communicator_);
01023
01024 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
01025 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
01026
01027
01028 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
01029 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
01030 if(i->empty())
01031 --no_to_recv;
01032 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
01033 if(i->empty())
01034 --no_to_send;
01035
01036 while(no_size_to_recv+no_to_send+no_to_recv)
01037 {
01038
01039 if(no_size_to_recv)
01040 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
01041 data_recv_req, recv_buffers,
01042 communicator_);
01043
01044
01045 if(no_to_send)
01046 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
01047 send_buffers, communicator_);
01048 if(validRecvRequests(data_recv_req))
01049
01050 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
01051 recv_buffers, communicator_);
01052 }
01053
01054
01055
01056 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
01057
01058 }
01059
01060 template<class Allocator>
01061 template<bool FORWARD, class DataHandle>
01062 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
01063 std::vector<InterfaceTracker>& data_recv_trackers)
01064 {
01065 std::vector<InterfaceTracker> send_trackers;
01066 std::vector<InterfaceTracker> recv_trackers;
01067 std::size_t size = interface_->size();
01068 std::vector<MPI_Request> send_requests(size);
01069 std::vector<MPI_Request> recv_requests(size);
01070 std::vector<MessageBuffer<std::size_t> >
01071 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
01072 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
01073 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
01074 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
01075 std::size_t size_to_send=size, size_to_recv=size;
01076
01077
01078 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
01079 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
01080 if(i->empty())
01081 --size_to_recv;
01082
01083 size_to_send -= setupRequests(size_handle, send_trackers, send_buffers, send_requests,
01084 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
01085 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
01086 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
01087
01088
01089 while(size_to_send+size_to_recv)
01090 {
01091 if(size_to_send)
01092 size_to_send -=
01093 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
01094 send_buffers, communicator_);
01095 if(size_to_recv)
01096
01097
01098
01099 size_to_recv -=
01100 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
01101 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
01102 SetupRecvRequest<SizeDataHandle<DataHandle> >());
01103 }
01104 }
01105
01106 template<class Allocator>
01107 template<bool FORWARD, class DataHandle>
01108 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
01109 {
01110
01111 std::vector<InterfaceTracker> send_trackers;
01112 std::vector<InterfaceTracker> recv_trackers;
01113 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
01114
01115 std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
01116 std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
01117 typedef typename DataHandle::DataType DataType;
01118 std::vector<MessageBuffer<DataType> >
01119 send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
01120 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
01121
01122 communicateSizes<FORWARD>(handle, recv_trackers);
01123 std::size_t no_to_send, no_to_recv;
01124 no_to_send = no_to_recv = interface_->size();
01125
01126 no_to_send -= setupRequests(handle, send_trackers, send_buffers, send_requests,
01127 SetupSendRequest<DataHandle>(), communicator_);
01128 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
01129 SetupRecvRequest<DataHandle>(), communicator_);
01130
01131 while(no_to_send+no_to_recv)
01132 {
01133
01134 if(no_to_send)
01135 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
01136 send_buffers, communicator_);
01137 if(no_to_recv)
01138
01139 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
01140 recv_buffers, communicator_);
01141 }
01142 }
01143
01144 template<class Allocator>
01145 template<bool FORWARD, class DataHandle>
01146 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
01147 {
01148 if( interface_->size() == 0)
01149
01150
01151 return;
01152
01153 if(handle.fixedsize())
01154 communicateFixedSize<FORWARD>(handle);
01155 else
01156 communicateVariableSize<FORWARD>(handle);
01157 }
01158 }
01159 #endif
01160 #endif