ThreadHandle.hpp
1 #ifndef OPM_THREADHANDLE_HPP
2 #define OPM_THREADHANDLE_HPP
3 
4 #include <cassert>
5 #include <dune/common/exceptions.hh>
6 
7 #include <thread>
8 #include <mutex>
9 #include <queue>
10 
11 namespace Opm
12 {
13 
15  {
16  public:
18  {
19  protected:
20  ObjectInterface() {}
21  public:
22  virtual ~ObjectInterface() {}
23  virtual void run() = 0;
24  virtual bool isEndMarker () const { return false; }
25  };
26 
27  template <class Object>
29  {
30  Object obj_;
31  public:
32  ObjectWrapper( Object&& obj ) : obj_( std::move( obj ) ) {}
33  void run() { obj_.run(); }
34  };
35 
36  protected:
37  class EndObject : public ObjectInterface
38  {
39  public:
40  void run () { }
41  bool isEndMarker () const { return true; }
42  };
43 
45  // class ThreadHandleQueue
48  {
49  protected:
50  std::queue< std::unique_ptr< ObjectInterface > > objQueue_;
51  std::mutex mutex_;
52 
53  // no copying
54  ThreadHandleQueue( const ThreadHandleQueue& ) = delete;
55 
56  // wait duration of 10 milli seconds
57  void wait() const
58  {
59  std::this_thread::sleep_for( std::chrono::milliseconds(10) );
60  }
61 
62  public:
65  : objQueue_(), mutex_()
66  {
67  }
68 
70  {
71  // wait until all objects have been written.
72  while( ! objQueue_.empty() )
73  {
74  wait();
75  }
76  }
77 
79  void push_back( std::unique_ptr< ObjectInterface >&& obj )
80  {
81  // lock mutex to make sure objPtr is not used
82  mutex_.lock();
83  objQueue_.emplace( std::move(obj) );
84  mutex_.unlock();
85  }
86 
88  void run()
89  {
90  // wait until objects have been pushed to the queue
91  while( objQueue_.empty() )
92  {
93  // sleep one second
94  wait();
95  }
96 
97  {
98  // lock mutex for access to objQueue_
99  mutex_.lock();
100 
101  // get next object from queue
102  std::unique_ptr< ObjectInterface > obj( objQueue_.front().release() );
103  // remove object from queue
104  objQueue_.pop();
105 
106  // unlock mutex for access to objQueue_
107  mutex_.unlock();
108 
109  // if object is end marker terminate thread
110  if( obj->isEndMarker() ){
111  if( ! objQueue_.empty() ) {
112  OPM_THROW(std::logic_error,"ThreadHandleQueue: not all queued objects were executed");
113  }
114  return;
115  }
116 
117  // execute object action
118  obj->run();
119  }
120 
121  // keep thread running
122  run();
123  }
124  }; // end ThreadHandleQueue
125 
127  // end ThreadHandleQueue
129 
130  // start the thread by calling method run
131  static void startThread( ThreadHandleQueue* obj )
132  {
133  obj->run();
134  }
135 
136  ThreadHandleQueue threadObjectQueue_;
137  std::unique_ptr< std::thread > thread_;
138 
139  private:
140  // prohibit copying
141  ThreadHandle( const ThreadHandle& ) = delete;
142 
143  public:
146  ThreadHandle( const bool createThread )
147  : threadObjectQueue_(),
148  thread_()
149  {
150  if( createThread )
151  {
152  thread_.reset( new std::thread( startThread, &threadObjectQueue_ ) );
153  // detach thread into nirvana
154  thread_->detach();
155  }
156  } // end constructor
157 
159  template <class Object>
160  void dispatch( Object&& obj )
161  {
162  if( thread_ )
163  {
164  typedef ObjectWrapper< Object > ObjectPointer;
165  ObjectInterface* objPtr = new ObjectPointer( std::move(obj) );
166 
167  // add object to queue of objects
168  threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) );
169  }
170  else
171  {
172  OPM_THROW(std::logic_error,"ThreadHandle::dispatch called without thread being initialized (i.e. on non-ioRank)");
173  }
174  }
175 
178  {
179  if( thread_ )
180  {
181  // dispatch end object which will terminate the thread
182  threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ;
183  }
184  }
185  };
186 
187 } // end namespace Opm
188 #endif
Definition: ThreadHandle.hpp:37
~ThreadHandle()
destructor terminating the thread
Definition: ThreadHandle.hpp:177
void run()
do the work until the queue received an end object
Definition: ThreadHandle.hpp:88
Definition: ThreadHandle.hpp:14
ThreadHandle(const bool createThread)
constructor creating ThreadHandle
Definition: ThreadHandle.hpp:146
This file contains a set of helper functions used by VFPProd / VFPInj.
Definition: AdditionalObjectDeleter.hpp:22
void push_back(std::unique_ptr< ObjectInterface > &&obj)
insert object into threads queue
Definition: ThreadHandle.hpp:79
Definition: ThreadHandle.hpp:28
Definition: ThreadHandle.hpp:47
Definition: ThreadHandle.hpp:17
void dispatch(Object &&obj)
dispatch object to queue of separate thread
Definition: ThreadHandle.hpp:160
ThreadHandleQueue()
constructor creating object that is executed by thread
Definition: ThreadHandle.hpp:64