00001 #ifndef OPM_THREADHANDLE_HPP
00002 #define OPM_THREADHANDLE_HPP
00003
00004 #include <cassert>
00005 #include <dune/common/exceptions.hh>
00006
00007 #include <thread>
00008 #include <mutex>
00009 #include <queue>
00010
00011 namespace Opm
00012 {
00013
00014 class ThreadHandle
00015 {
00016 public:
00017 class ObjectInterface
00018 {
00019 protected:
00020 ObjectInterface() {}
00021 public:
00022 virtual ~ObjectInterface() {}
00023 virtual void run() = 0;
00024 virtual bool isEndMarker () const { return false; }
00025 };
00026
00027 template <class Object>
00028 class ObjectWrapper : public ObjectInterface
00029 {
00030 Object obj_;
00031 public:
00032 ObjectWrapper( Object&& obj ) : obj_( std::move( obj ) ) {}
00033 void run() { obj_.run(); }
00034 };
00035
00036 protected:
00037 class EndObject : public ObjectInterface
00038 {
00039 public:
00040 void run () { }
00041 bool isEndMarker () const { return true; }
00042 };
00043
00045
00047 class ThreadHandleQueue
00048 {
00049 protected:
00050 std::queue< std::unique_ptr< ObjectInterface > > objQueue_;
00051 std::mutex mutex_;
00052
00053
00054 ThreadHandleQueue( const ThreadHandleQueue& ) = delete;
00055
00056
00057 void wait() const
00058 {
00059 std::this_thread::sleep_for( std::chrono::milliseconds(10) );
00060 }
00061
00062 public:
00064 ThreadHandleQueue()
00065 : objQueue_(), mutex_()
00066 {
00067 }
00068
00069 ~ThreadHandleQueue()
00070 {
00071
00072 while( ! objQueue_.empty() )
00073 {
00074 wait();
00075 }
00076 }
00077
00079 void push_back( std::unique_ptr< ObjectInterface >&& obj )
00080 {
00081
00082 mutex_.lock();
00083 objQueue_.emplace( std::move(obj) );
00084 mutex_.unlock();
00085 }
00086
00088 void run()
00089 {
00090
00091 while( objQueue_.empty() )
00092 {
00093
00094 wait();
00095 }
00096
00097 {
00098
00099 mutex_.lock();
00100
00101
00102 std::unique_ptr< ObjectInterface > obj( objQueue_.front().release() );
00103
00104 objQueue_.pop();
00105
00106
00107 mutex_.unlock();
00108
00109
00110 if( obj->isEndMarker() ){
00111 if( ! objQueue_.empty() ) {
00112 OPM_THROW(std::logic_error,"ThreadHandleQueue: not all queued objects were executed");
00113 }
00114 return;
00115 }
00116
00117
00118 obj->run();
00119 }
00120
00121
00122 run();
00123 }
00124 };
00125
00127
00129
00130
00131 static void startThread( ThreadHandleQueue* obj )
00132 {
00133 obj->run();
00134 }
00135
00136 ThreadHandleQueue threadObjectQueue_;
00137 std::unique_ptr< std::thread > thread_;
00138
00139 private:
00140
00141 ThreadHandle( const ThreadHandle& ) = delete;
00142
00143 public:
00146 ThreadHandle( const bool createThread )
00147 : threadObjectQueue_(),
00148 thread_()
00149 {
00150 if( createThread )
00151 {
00152 thread_.reset( new std::thread( startThread, &threadObjectQueue_ ) );
00153
00154 thread_->detach();
00155 }
00156 }
00157
00159 template <class Object>
00160 void dispatch( Object&& obj )
00161 {
00162 if( thread_ )
00163 {
00164 typedef ObjectWrapper< Object > ObjectPointer;
00165 ObjectInterface* objPtr = new ObjectPointer( std::move(obj) );
00166
00167
00168 threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) );
00169 }
00170 else
00171 {
00172 OPM_THROW(std::logic_error,"ThreadHandle::dispatch called without thread being initialized (i.e. on non-ioRank)");
00173 }
00174 }
00175
00177 ~ThreadHandle()
00178 {
00179 if( thread_ )
00180 {
00181
00182 threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ;
00183 }
00184 }
00185 };
00186
00187 }
00188 #endif