00001
00002
00003 #ifndef DUNE_COMMUNICATOR
00004 #define DUNE_COMMUNICATOR
00005
00006 #include "remoteindices.hh"
00007 #include "interface.hh"
00008 #include <dune/common/exceptions.hh>
00009 #include <dune/common/typetraits.hh>
00010 #include <dune/common/stdstreams.hh>
00011
00012 #if HAVE_MPI
00013
00014 #include <mpi.h>
00015
00016 namespace Dune
00017 {
00101 struct SizeOne
00102 {};
00103
00109 struct VariableSize
00110 {};
00111
00112
00118 template<class V>
00119 struct CommPolicy
00120 {
00132 typedef V Type;
00133
00139 typedef typename V::value_type IndexedType;
00140
00145 typedef SizeOne IndexedTypeFlag;
00146
00155 static const void* getAddress(const V& v, int index);
00156
00162 static int getSize(const V&, int index);
00163 };
00164
00165 template<class K, int n> class FieldVector;
00166
00167 template<class B, class A> class VariableBlockVector;
00168
00169 template<class K, class A, int n>
00170 struct CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >
00171 {
00172 typedef VariableBlockVector<FieldVector<K, n>, A> Type;
00173
00174 typedef typename Type::B IndexedType;
00175
00176 typedef VariableSize IndexedTypeFlag;
00177
00178 static const void* getAddress(const Type& v, int i);
00179
00180 static int getSize(const Type& v, int i);
00181 };
00182
00186 class CommunicationError : public IOError
00187 {};
00188
00192 template<class T>
00193 struct CopyGatherScatter
00194 {
00195 typedef typename CommPolicy<T>::IndexedType IndexedType;
00196
00197 static const IndexedType& gather(const T& vec, std::size_t i);
00198
00199 static void scatter(T& vec, const IndexedType& v, std::size_t i);
00200
00201 };
00202
00214 template<typename T>
00215 class DatatypeCommunicator : public InterfaceBuilder
00216 {
00217 public:
00218
00222 typedef T ParallelIndexSet;
00223
00227 typedef Dune::RemoteIndices<ParallelIndexSet> RemoteIndices;
00228
00232 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
00233
00237 typedef typename RemoteIndices::Attribute Attribute;
00238
00242 typedef typename RemoteIndices::LocalIndex LocalIndex;
00243
00247 DatatypeCommunicator();
00248
00252 ~DatatypeCommunicator();
00253
00280 template<class T1, class T2, class V>
00281 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
00282
00286 void forward();
00287
00291 void backward();
00292
00296 void free();
00297 private:
00298 enum {
00302 commTag_ = 234
00303 };
00304
00308 const RemoteIndices* remoteIndices_;
00309
00310 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
00311 MessageTypeMap;
00312
00316 MessageTypeMap messageTypes;
00317
00321 void* data_;
00322
00323 MPI_Request* requests_[2];
00324
00328 bool created_;
00329
00333 template<class V, bool FORWARD>
00334 void createRequests(V& sendData, V& receiveData);
00335
00339 template<class T1, class T2, class V, bool send>
00340 void createDataTypes(const T1& source, const T2& destination, V& data);
00341
00345 void sendRecv(MPI_Request* req);
00346
00350 struct IndexedTypeInformation
00351 {
00357 void build(int i)
00358 {
00359 length = new int[i];
00360 displ = new MPI_Aint[i];
00361 size = i;
00362 }
00363
00367 void free()
00368 {
00369 delete[] length;
00370 delete[] displ;
00371 }
00373 int* length;
00375 MPI_Aint* displ;
00381 int elements;
00385 int size;
00386 };
00387
00393 template<class V>
00394 struct MPIDatatypeInformation
00395 {
00400 MPIDatatypeInformation(const V& data) : data_(data)
00401 {}
00402
00408 void reserve(int proc, int size)
00409 {
00410 information_[proc].build(size);
00411 }
00418 void add(int proc, int local)
00419 {
00420 IndexedTypeInformation& info=information_[proc];
00421 assert((info.elements)<info.size);
00422 MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
00423 info.displ+info.elements);
00424 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
00425 info.elements++;
00426 }
00427
00432 std::map<int,IndexedTypeInformation> information_;
00436 const V& data_;
00437
00438 };
00439
00440 };
00441
00451 class BufferedCommunicator
00452 {
00453
00454 public:
00458 BufferedCommunicator();
00459
00466 template<class Data, class Interface>
00467 typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
00468 build(const Interface& interface);
00469
00477 template<class Data, class Interface>
00478 void build(const Data& source, const Data& target, const Interface& interface);
00479
00508 template<class GatherScatter, class Data>
00509 void forward(const Data& source, Data& dest);
00510
00539 template<class GatherScatter, class Data>
00540 void backward(Data& source, const Data& dest);
00541
00567 template<class GatherScatter, class Data>
00568 void forward(Data& data);
00569
00595 template<class GatherScatter, class Data>
00596 void backward(Data& data);
00597
00601 void free();
00602
00606 ~BufferedCommunicator();
00607
00608 private:
00609
00613 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
00614 InterfaceMap;
00615
00616
00620 template<class Data, typename IndexedTypeFlag>
00621 struct MessageSizeCalculator
00622 {};
00623
00628 template<class Data>
00629 struct MessageSizeCalculator<Data,SizeOne>
00630 {
00637 inline int operator()(const InterfaceInformation& info) const;
00646 inline int operator()(const Data& data, const InterfaceInformation& info) const;
00647 };
00648
00653 template<class Data>
00654 struct MessageSizeCalculator<Data,VariableSize>
00655 {
00664 inline int operator()(const Data& data, const InterfaceInformation& info) const;
00665 };
00666
00670 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
00671 struct MessageGatherer
00672 {};
00673
00678 template<class Data, class GatherScatter, bool send>
00679 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
00680 {
00682 typedef typename CommPolicy<Data>::IndexedType Type;
00683
00688 typedef GatherScatter Gatherer;
00689
00690 enum {
00696 forward=send
00697 };
00698
00706 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
00707 };
00708
00713 template<class Data, class GatherScatter, bool send>
00714 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
00715 {
00717 typedef typename CommPolicy<Data>::IndexedType Type;
00718
00723 typedef GatherScatter Gatherer;
00724
00725 enum {
00731 forward=send
00732 };
00733
00741 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
00742 };
00743
00747 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
00748 struct MessageScatterer
00749 {};
00750
00755 template<class Data, class GatherScatter, bool send>
00756 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
00757 {
00759 typedef typename CommPolicy<Data>::IndexedType Type;
00760
00765 typedef GatherScatter Scatterer;
00766
00767 enum {
00773 forward=send
00774 };
00775
00783 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
00784 };
00789 template<class Data, class GatherScatter, bool send>
00790 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
00791 {
00793 typedef typename CommPolicy<Data>::IndexedType Type;
00794
00799 typedef GatherScatter Scatterer;
00800
00801 enum {
00807 forward=send
00808 };
00809
00817 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
00818 };
00819
00823 struct MessageInformation
00824 {
00826 MessageInformation()
00827 : start_(0), size_(0)
00828 {}
00829
00837 MessageInformation(size_t start, size_t size)
00838 : start_(start), size_(size)
00839 {}
00843 size_t start_;
00847 size_t size_;
00848 };
00849
00856 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
00857 InformationMap;
00861 InformationMap messageInformation_;
00865 char* buffers_[2];
00869 size_t bufferSize_[2];
00870
00871 enum {
00875 commTag_
00876 };
00877
00881 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
00882
00883 MPI_Comm communicator_;
00884
00888 template<class GatherScatter, bool FORWARD, class Data>
00889 void sendRecv(const Data& source, Data& target);
00890
00891 };
00892
00893 #ifndef DOXYGEN
00894
00895 template<class V>
00896 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
00897 {
00898 return &(v[index]);
00899 }
00900
00901 template<class V>
00902 inline int CommPolicy<V>::getSize(const V& v, int index)
00903 {
00904 DUNE_UNUSED_PARAMETER(v);
00905 DUNE_UNUSED_PARAMETER(index);
00906 return 1;
00907 }
00908
00909 template<class K, class A, int n>
00910 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
00911 {
00912 return &(v[index][0]);
00913 }
00914
00915 template<class K, class A, int n>
00916 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
00917 {
00918 return v[index].getsize();
00919 }
00920
00921 template<class T>
00922 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
00923 {
00924 return vec[i];
00925 }
00926
00927 template<class T>
00928 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
00929 {
00930 vec[i]=v;
00931 }
00932
00933 template<typename T>
00934 DatatypeCommunicator<T>::DatatypeCommunicator()
00935 : remoteIndices_(0), created_(false)
00936 {
00937 requests_[0]=0;
00938 requests_[1]=0;
00939 }
00940
00941
00942
00943 template<typename T>
00944 DatatypeCommunicator<T>::~DatatypeCommunicator()
00945 {
00946 free();
00947 }
00948
00949 template<typename T>
00950 template<class T1, class T2, class V>
00951 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
00952 const T1& source, V& sendData,
00953 const T2& destination, V& receiveData)
00954 {
00955 remoteIndices_ = &remoteIndices;
00956 free();
00957 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
00958 createDataTypes<T1,T2,V,true>(source,destination, sendData);
00959 createRequests<V,true>(sendData, receiveData);
00960 createRequests<V,false>(receiveData, sendData);
00961 created_=true;
00962 }
00963
00964 template<typename T>
00965 void DatatypeCommunicator<T>::free()
00966 {
00967 if(created_) {
00968 delete[] requests_[0];
00969 delete[] requests_[1];
00970 typedef MessageTypeMap::iterator iterator;
00971 typedef MessageTypeMap::const_iterator const_iterator;
00972
00973 const const_iterator end=messageTypes.end();
00974
00975 for(iterator process = messageTypes.begin(); process != end; ++process) {
00976 MPI_Datatype *type = &(process->second.first);
00977 int finalized=0;
00978 MPI_Finalized(&finalized);
00979 if(*type!=MPI_DATATYPE_NULL && !finalized)
00980 MPI_Type_free(type);
00981 type = &(process->second.second);
00982 if(*type!=MPI_DATATYPE_NULL && !finalized)
00983 MPI_Type_free(type);
00984 }
00985 messageTypes.clear();
00986 created_=false;
00987 }
00988
00989 }
00990
00991 template<typename T>
00992 template<class T1, class T2, class V, bool send>
00993 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
00994 {
00995
00996 MPIDatatypeInformation<V> dataInfo(data);
00997 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
00998
00999 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
01000 const const_iterator end=this->remoteIndices_->end();
01001
01002
01003 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
01004 IndexedTypeInformation& info=dataInfo.information_[process->first];
01005
01006 MPI_Aint base;
01007 MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
01008
01009 for(int i=0; i< info.elements; i++) {
01010 info.displ[i]-=base;
01011 }
01012
01013
01014 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
01015 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
01016 MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
01017 MPI_Type_commit(type);
01018
01019 info.free();
01020 }
01021 }
01022
01023 template<typename T>
01024 template<class V, bool createForward>
01025 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
01026 {
01027 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
01028 int rank;
01029 static int index = createForward ? 1 : 0;
01030 int noMessages = messageTypes.size();
01031
01032 requests_[index] = new MPI_Request[2*noMessages];
01033 const MapIterator end = messageTypes.end();
01034 int request=0;
01035 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
01036
01037
01038 for(MapIterator process = messageTypes.begin(); process != end;
01039 ++process, ++request) {
01040 MPI_Datatype type = createForward ? process->second.second : process->second.first;
01041 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
01042 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
01043 }
01044
01045
01046
01047 for(MapIterator process = messageTypes.begin(); process != end;
01048 ++process, ++request) {
01049 MPI_Datatype type = createForward ? process->second.first : process->second.second;
01050 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
01051 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
01052 }
01053 }
01054
01055 template<typename T>
01056 void DatatypeCommunicator<T>::forward()
01057 {
01058 sendRecv(requests_[1]);
01059 }
01060
01061 template<typename T>
01062 void DatatypeCommunicator<T>::backward()
01063 {
01064 sendRecv(requests_[0]);
01065 }
01066
01067 template<typename T>
01068 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
01069 {
01070 int noMessages = messageTypes.size();
01071
01072 MPI_Startall(noMessages, requests);
01073
01074 MPI_Startall(noMessages, requests+noMessages);
01075
01076
01077 MPI_Status* status=new MPI_Status[2*noMessages];
01078 for(int i=0; i<2*noMessages; i++)
01079 status[i].MPI_ERROR=MPI_SUCCESS;
01080
01081 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
01082 int receive = MPI_Waitall(noMessages, requests, status);
01083
01084
01085 int success=1, globalSuccess=0;
01086 if(send==MPI_ERR_IN_STATUS) {
01087 int rank;
01088 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
01089 std::cerr<<rank<<": Error in sending :"<<std::endl;
01090
01091 for(int i=noMessages; i< 2*noMessages; i++)
01092 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
01093 char message[300];
01094 int messageLength;
01095 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
01096 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
01097 for(int j = 0; j < messageLength; j++)
01098 std::cout << message[j];
01099 }
01100 std::cerr<<std::endl;
01101 success=0;
01102 }
01103
01104 if(receive==MPI_ERR_IN_STATUS) {
01105 int rank;
01106 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
01107 std::cerr<<rank<<": Error in receiving!"<<std::endl;
01108
01109 for(int i=0; i< noMessages; i++)
01110 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
01111 char message[300];
01112 int messageLength;
01113 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
01114 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
01115 for(int j = 0; j < messageLength; j++)
01116 std::cerr << message[j];
01117 }
01118 std::cerr<<std::endl;
01119 success=0;
01120 }
01121
01122 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
01123
01124 delete[] status;
01125
01126 if(!globalSuccess)
01127 DUNE_THROW(CommunicationError, "A communication error occurred!");
01128
01129 }
01130
01131 inline BufferedCommunicator::BufferedCommunicator()
01132 {
01133 buffers_[0]=0;
01134 buffers_[1]=0;
01135 bufferSize_[0]=0;
01136 bufferSize_[1]=0;
01137 }
01138
01139 template<class Data, class Interface>
01140 typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
01141 BufferedCommunicator::build(const Interface& interface)
01142 {
01143 interfaces_=interface.interfaces();
01144 communicator_=interface.communicator();
01145 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
01146 ::const_iterator const_iterator;
01147 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
01148 const const_iterator end = interfaces_.end();
01149 int lrank;
01150 MPI_Comm_rank(communicator_, &lrank);
01151
01152 bufferSize_[0]=0;
01153 bufferSize_[1]=0;
01154
01155 for(const_iterator interfacePair = interfaces_.begin();
01156 interfacePair != end; ++interfacePair) {
01157 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
01158 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
01159 if (noSend + noRecv > 0)
01160 messageInformation_.insert(std::make_pair(interfacePair->first,
01161 std::make_pair(MessageInformation(bufferSize_[0],
01162 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
01163 MessageInformation(bufferSize_[1],
01164 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
01165 bufferSize_[0] += noSend;
01166 bufferSize_[1] += noRecv;
01167 }
01168
01169
01170 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
01171 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
01172
01173 buffers_[0] = new char[bufferSize_[0]];
01174 buffers_[1] = new char[bufferSize_[1]];
01175 }
01176
01177 template<class Data, class Interface>
01178 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
01179 {
01180
01181 interfaces_=interface.interfaces();
01182 communicator_=interface.communicator();
01183 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
01184 ::const_iterator const_iterator;
01185 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
01186 const const_iterator end = interfaces_.end();
01187
01188 bufferSize_[0]=0;
01189 bufferSize_[1]=0;
01190
01191 for(const_iterator interfacePair = interfaces_.begin();
01192 interfacePair != end; ++interfacePair) {
01193 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
01194 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
01195 if (noSend + noRecv > 0)
01196 messageInformation_.insert(std::make_pair(interfacePair->first,
01197 std::make_pair(MessageInformation(bufferSize_[0],
01198 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
01199 MessageInformation(bufferSize_[1],
01200 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
01201 bufferSize_[0] += noSend;
01202 bufferSize_[1] += noRecv;
01203 }
01204
01205 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
01206 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
01207
01208 buffers_[0] = new char[bufferSize_[0]];
01209 buffers_[1] = new char[bufferSize_[1]];
01210 }
01211
01212 inline void BufferedCommunicator::free()
01213 {
01214 messageInformation_.clear();
01215 if(buffers_[0])
01216 delete[] buffers_[0];
01217
01218 if(buffers_[1])
01219 delete[] buffers_[1];
01220 buffers_[0]=buffers_[1]=0;
01221 }
01222
01223 inline BufferedCommunicator::~BufferedCommunicator()
01224 {
01225 free();
01226 }
01227
01228 template<class Data>
01229 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
01230 (const InterfaceInformation& info) const
01231 {
01232 return info.size();
01233 }
01234
01235
01236 template<class Data>
01237 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
01238 (const Data&, const InterfaceInformation& info) const
01239 {
01240 return operator()(info);
01241 }
01242
01243
01244 template<class Data>
01245 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
01246 (const Data& data, const InterfaceInformation& info) const
01247 {
01248 int entries=0;
01249
01250 for(size_t i=0; i < info.size(); i++)
01251 entries += CommPolicy<Data>::getSize(data,info[i]);
01252
01253 return entries;
01254 }
01255
01256
01257 template<class Data, class GatherScatter, bool FORWARD>
01258 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
01259 {
01260 DUNE_UNUSED_PARAMETER(bufferSize);
01261 typedef typename InterfaceMap::const_iterator
01262 const_iterator;
01263
01264 int rank;
01265 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
01266 const const_iterator end = interfaces.end();
01267 size_t index=0;
01268
01269 for(const_iterator interfacePair = interfaces.begin();
01270 interfacePair != end; ++interfacePair) {
01271 int size = forward ? interfacePair->second.first.size() :
01272 interfacePair->second.second.size();
01273
01274 for(int i=0; i < size; i++) {
01275 int local = forward ? interfacePair->second.first[i] :
01276 interfacePair->second.second[i];
01277 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
01278
01279 #ifdef DUNE_ISTL_WITH_CHECKING
01280 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
01281 #endif
01282 buffer[index]=GatherScatter::gather(data, local, j);
01283 }
01284
01285 }
01286 }
01287
01288 }
01289
01290
01291 template<class Data, class GatherScatter, bool FORWARD>
01292 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
01293 {
01294 DUNE_UNUSED_PARAMETER(bufferSize);
01295 typedef typename InterfaceMap::const_iterator
01296 const_iterator;
01297 const const_iterator end = interfaces.end();
01298 size_t index = 0;
01299
01300 int rank;
01301 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
01302
01303 for(const_iterator interfacePair = interfaces.begin();
01304 interfacePair != end; ++interfacePair) {
01305 size_t size = FORWARD ? interfacePair->second.first.size() :
01306 interfacePair->second.second.size();
01307
01308 for(size_t i=0; i < size; i++) {
01309
01310 #ifdef DUNE_ISTL_WITH_CHECKING
01311 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
01312 #endif
01313
01314 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
01315 interfacePair->second.second[i]);
01316 }
01317 }
01318
01319 }
01320
01321
01322 template<class Data, class GatherScatter, bool FORWARD>
01323 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
01324 {
01325 typedef typename InterfaceMap::value_type::second_type::first_type Information;
01326 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
01327
01328 assert(infoPair!=interfaces.end());
01329
01330 const Information& info = FORWARD ? infoPair->second.second :
01331 infoPair->second.first;
01332
01333 for(size_t i=0, index=0; i < info.size(); i++) {
01334 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
01335 GatherScatter::scatter(data, buffer[index++], info[i], j);
01336 }
01337 }
01338
01339
01340 template<class Data, class GatherScatter, bool FORWARD>
01341 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
01342 {
01343 typedef typename InterfaceMap::value_type::second_type::first_type Information;
01344 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
01345
01346 assert(infoPair!=interfaces.end());
01347
01348 const Information& info = FORWARD ? infoPair->second.second :
01349 infoPair->second.first;
01350
01351 for(size_t i=0; i < info.size(); i++) {
01352 GatherScatter::scatter(data, buffer[i], info[i]);
01353 }
01354 }
01355
01356
01357 template<class GatherScatter,class Data>
01358 void BufferedCommunicator::forward(Data& data)
01359 {
01360 this->template sendRecv<GatherScatter,true>(data, data);
01361 }
01362
01363
01364 template<class GatherScatter, class Data>
01365 void BufferedCommunicator::backward(Data& data)
01366 {
01367 this->template sendRecv<GatherScatter,false>(data, data);
01368 }
01369
01370
01371 template<class GatherScatter, class Data>
01372 void BufferedCommunicator::forward(const Data& source, Data& dest)
01373 {
01374 this->template sendRecv<GatherScatter,true>(source, dest);
01375 }
01376
01377
01378 template<class GatherScatter, class Data>
01379 void BufferedCommunicator::backward(Data& source, const Data& dest)
01380 {
01381 this->template sendRecv<GatherScatter,false>(dest, source);
01382 }
01383
01384
01385 template<class GatherScatter, bool FORWARD, class Data>
01386 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
01387 {
01388 int rank, lrank;
01389
01390 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
01391 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
01392
01393 typedef typename CommPolicy<Data>::IndexedType Type;
01394 Type *sendBuffer, *recvBuffer;
01395 size_t sendBufferSize;
01396 #ifndef NDEBUG
01397 size_t recvBufferSize;
01398 #endif
01399
01400 if(FORWARD) {
01401 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
01402 sendBufferSize = bufferSize_[0];
01403 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
01404 #ifndef NDEBUG
01405 recvBufferSize = bufferSize_[1];
01406 #endif
01407 }else{
01408 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
01409 sendBufferSize = bufferSize_[1];
01410 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
01411 #ifndef NDEBUG
01412 recvBufferSize = bufferSize_[0];
01413 #endif
01414 }
01415 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
01416
01417 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
01418
01419 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
01420 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
01421
01422 size_t numberOfRealRecvRequests = 0;
01423
01424
01425 typedef typename InformationMap::const_iterator const_iterator;
01426
01427 const const_iterator end = messageInformation_.end();
01428 size_t i=0;
01429 int* processMap = new int[messageInformation_.size()];
01430
01431 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
01432 processMap[i]=info->first;
01433 if(FORWARD) {
01434 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
01435 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
01436 if(info->second.second.size_) {
01437 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
01438 MPI_BYTE, info->first, commTag_, communicator_,
01439 recvRequests+i);
01440 numberOfRealRecvRequests += 1;
01441 } else {
01442
01443 recvRequests[i]=MPI_REQUEST_NULL;
01444 }
01445 }else{
01446 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
01447 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
01448 if(info->second.first.size_) {
01449 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
01450 MPI_BYTE, info->first, commTag_, communicator_,
01451 recvRequests+i);
01452 numberOfRealRecvRequests += 1;
01453 } else {
01454
01455 recvRequests[i]=MPI_REQUEST_NULL;
01456 }
01457 }
01458 }
01459
01460
01461 i=0;
01462 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
01463 if(FORWARD) {
01464 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
01465 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
01466 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
01467 if(info->second.first.size_)
01468 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
01469 MPI_BYTE, info->first, commTag_, communicator_,
01470 sendRequests+i);
01471 else
01472
01473 sendRequests[i]=MPI_REQUEST_NULL;
01474 }else{
01475 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
01476 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
01477 if(info->second.second.size_)
01478 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
01479 MPI_BYTE, info->first, commTag_, communicator_,
01480 sendRequests+i);
01481 else
01482
01483 sendRequests[i]=MPI_REQUEST_NULL;
01484 }
01485
01486
01487 i=0;
01488
01489 int finished = MPI_UNDEFINED;
01490 MPI_Status status;
01491
01492
01493 for(i=0; i< numberOfRealRecvRequests; i++) {
01494 status.MPI_ERROR=MPI_SUCCESS;
01495 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
01496 assert(finished != MPI_UNDEFINED);
01497
01498 if(status.MPI_ERROR==MPI_SUCCESS) {
01499 int& proc = processMap[finished];
01500 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
01501 assert(infoIter != messageInformation_.end());
01502
01503 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
01504 assert(info.start_+info.size_ <= recvBufferSize);
01505
01506 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
01507 }else{
01508 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
01509
01510 }
01511 }
01512
01513 MPI_Status recvStatus;
01514
01515
01516 for(i=0; i< messageInformation_.size(); i++)
01517 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
01518 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
01519
01520 }
01521
01522
01523
01524
01525
01526
01527
01528 delete[] processMap;
01529 delete[] sendRequests;
01530 delete[] recvRequests;
01531
01532 }
01533
01534 #endif // DOXYGEN
01535
01537 }
01538
01539 #endif
01540
01541 #endif