Fawkes API Fawkes Development Version
interruptible_barrier.cpp
1
2/***************************************************************************
3 * interruptible_barrier.cpp - Interruptible Barrier
4 *
5 * Created: Sat Jan 31 12:30:32 2009
6 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version. A runtime exception applies to
14 * this software (see LICENSE.GPL_WRE file mentioned below for details).
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Library General Public License for more details.
20 *
21 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22 */
23
24#include <core/exceptions/system.h>
25#include <core/macros.h>
26#include <core/threading/interruptible_barrier.h>
27#include <core/threading/mutex.h>
28#include <core/threading/thread_list.h>
29#include <core/threading/wait_condition.h>
30
31namespace fawkes {
32
33/// @cond INTERNALS
34class InterruptibleBarrierData
35{
36public:
37 unsigned int threads_left;
38 Mutex * mutex;
39 WaitCondition *waitcond;
40 bool own_mutex;
41
42 InterruptibleBarrierData(Mutex *mutex)
43 {
44 if (mutex) {
45 this->mutex = mutex;
46 own_mutex = false;
47 } else {
48 this->mutex = new Mutex();
49 own_mutex = true;
50 }
51 waitcond = new WaitCondition(this->mutex);
52 }
53
54 ~InterruptibleBarrierData()
55 {
56 if (own_mutex)
57 delete mutex;
58 delete waitcond;
59 }
60};
61/// @endcond
62
63/** @class InterruptibleBarrier <core/threading/barrier.h>
64 * A barrier is a synchronization tool which blocks until a given number of
65 * threads have reached the barrier. This particular implementations allows for
66 * giving a timeout after which the waiting is aborted.
67 *
68 * For general information when a barrier is useful see the Barrier class.
69 *
70 * Additionally to the general barrier features the InterruptibleBarrier::wait()
71 * can be given a timeout after which the waiting is aborted.
72 * Since the POSIX standard does not provide a timed wait for barriers this
73 * implementation uses a Mutex and WaitCondition internally to achieve the
74 * desired result.
75 *
76 * @see Barrier
77 * @ingroup Threading
78 * @author Tim Niemueller
79 */
80
81/** Constructor.
82 * @param count the number of threads to wait for
83 */
85{
86 _count = count;
87 if (_count == 0) {
88 throw Exception("Barrier count must be at least 1");
89 }
90 data_ = new InterruptibleBarrierData(NULL);
91 data_->threads_left = 0;
92 passed_threads_ = RefPtr<ThreadList>(new ThreadList());
93
94 interrupted_ = false;
95 timeout_ = false;
96 num_threads_in_wait_function_ = 0;
97}
98
99/** Constructor with custom mutex.
100 * Use this constructor only if you really know what you are doing. This constructor
101 * allows to pass a mutex that is used internally for the barrier. Note that in
102 * this case it is your duty to lock the mutex before the wait() and unlock
103 * afterwards! It combines features of a barrier and a wait condition.
104 * @param mutex Mutex to use
105 * @param count the number of threads to wait for
106 */
107InterruptibleBarrier::InterruptibleBarrier(Mutex *mutex, unsigned int count) : Barrier(count)
108{
109 _count = count;
110 if (_count == 0) {
111 throw Exception("Barrier count must be at least 1");
112 }
113 data_ = new InterruptibleBarrierData(mutex);
114 data_->threads_left = 0;
115 passed_threads_ = RefPtr<ThreadList>(new ThreadList());
116
117 interrupted_ = false;
118 timeout_ = false;
119 num_threads_in_wait_function_ = 0;
120}
121
122/** Invalid constructor.
123 * This will throw an exception if called as it is illegal to copy
124 * a barrier.
125 * @param barrier to copy
126 */
128{
129 throw Exception("Barriers cannot be copied");
130}
131
132/** Invalid constructor.
133 * This will throw an exception if called as it is illegal to copy
134 * a barrier.
135 * @param barrier to copy
136 */
137InterruptibleBarrier::InterruptibleBarrier(const InterruptibleBarrier *b) : Barrier()
138{
139 throw Exception("Barriers cannot be copied");
140}
141
142/** Invalid assignment operator.
143 * This will throw an exception if called as it is illegal to assign
144 * a barrier.
145 * @param barrier to copy
146 */
147InterruptibleBarrier &
148InterruptibleBarrier::operator=(const InterruptibleBarrier &b)
149{
150 throw Exception("Barriers cannot be assigned");
151}
152
153/** Invalid assignment operator.
154 * This will throw an exception if called as it is illegal to assign
155 * a barrier.
156 * @param barrier to copy
157 */
158InterruptibleBarrier &
159InterruptibleBarrier::operator=(const InterruptibleBarrier *b)
160{
161 throw Exception("Barriers cannot be assigned");
162}
163
164/** Destructor */
166{
167 delete data_;
168}
169
170/** Get a list of threads that passed the barrier.
171 * The list contains the threads that passed the barrier. With some book keeping
172 * outside of the barrier you can determine which threads you expected at the
173 * barrier but did not pass it.
174 * @return refptr to list of threads that passed the barrier.
175 */
178{
179 return passed_threads_;
180}
181
182/** Interrupt the barrier.
183 * This will cause all threads currently waiting on the barrier to
184 * throw an exception and no further thread will wait.
185 * You have to call reset() the before you use this barrier
186 * the next time.
187 */
188void
190{
191 if (likely(data_->own_mutex))
192 data_->mutex->lock();
193 interrupted_ = true;
194 data_->waitcond->wake_all();
195 if (likely(data_->own_mutex))
196 data_->mutex->unlock();
197}
198
199/** Clears the barrier.
200 * Call this method when you want to use the barrier the next time after
201 * an interrupt or timeout occured. Make sure all threads that should have
202 * passed the barrier the last time did pass it.
203 */
204void
206{
207 if (likely(data_->own_mutex))
208 data_->mutex->lock();
209 interrupted_ = false;
210 timeout_ = false;
211 data_->threads_left = _count;
212 passed_threads_.clear();
213 if (likely(data_->own_mutex))
214 data_->mutex->unlock();
215}
216
217/** Wait for other threads.
218 * This method will block until as many threads have called wait as you have
219 * given count to the constructor. Note that if the barrier is interrupted or
220 * times out you need to call reset() to get the barrier into a re-usable state.
221 * It is your duty to make sure that all threads using the barrier are in a
222 * cohesive state.
223 * @param timeout_sec relative timeout in seconds, added to timeout_nanosec
224 * @param timeout_nanosec timeout in nanoseconds
225 * @return true, if the barrier was properly reached, false if the barrier timeout
226 * was reached and the wait did not finish properly.
227 * @exception InterruptedException thrown if the barrier was forcefully interrupted
228 * by calling interrupt().
229 */
230bool
231InterruptibleBarrier::wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
232{
233 if (likely(data_->own_mutex))
234 data_->mutex->lock();
235 num_threads_in_wait_function_++;
236
237 if (data_->threads_left == 0) {
238 // first to come
239 timeout_ = interrupted_ = wait_at_barrier_ = false;
240 data_->threads_left = _count;
241 passed_threads_->clear();
242 } else {
243 if (interrupted_ || timeout_) {
244 // interrupted or timed out threads need to be reset if they should be reused
245 num_threads_in_wait_function_--;
246 if (likely(data_->own_mutex))
247 data_->mutex->unlock();
248 return true;
249 }
250 }
251
252 --data_->threads_left;
253 try {
254 passed_threads_->push_back_locked(Thread::current_thread());
255 } catch (Exception &e) {
256 // Cannot do anything more useful :-/
257 // to stay fully compatible with Barrier we do *not* re-throw
258 e.print_trace();
259 }
260
261 bool local_timeout = false;
262
263 //Am I the last thread the interruptable barrier is waiting for? Then I can wake the others up.
264 bool waker = (data_->threads_left == 0);
265
266 while (data_->threads_left && !interrupted_ && !timeout_ && !local_timeout) {
267 //Here, the threads are waiting for the barrier
268 //pthread_cond_timedwait releases data_->mutex if it is not external
269 local_timeout = !data_->waitcond->reltimed_wait(timeout_sec, timeout_nanosec);
270 //before continuing, pthread_cond_timedwait locks data_->mutex again if it is not external
271 }
272
273 if (local_timeout) {
274 //set timeout flag of the interruptable barrier so the other threads can continue
275 timeout_ = true;
276 }
277
278 if (interrupted_) {
279 if (likely(data_->own_mutex))
280 data_->mutex->unlock();
281 throw InterruptedException("InterruptibleBarrier forcefully interrupted, only "
282 "%u of %u threads reached the barrier",
283 _count - data_->threads_left,
284 _count);
285 }
286
287 if (waker) {
288 //all threads of this barrier have to synchronize at the standard Barrier
289 wait_at_barrier_ = true;
290 }
291
292 if (waker || local_timeout) {
293 //the other threads can stop waiting in th while-loop
294 data_->waitcond->wake_all();
295 }
296
297 if (likely(data_->own_mutex))
298 data_->mutex->unlock();
299
300 if (wait_at_barrier_) {
301 //hard synchronization
303 }
304
305 if (likely(data_->own_mutex))
306 data_->mutex->lock();
307 //increment is not threadsafe
308 num_threads_in_wait_function_--;
309 if (likely(data_->own_mutex))
310 data_->mutex->unlock();
311
312 return !timeout_;
313}
314
315/** Checks if there are no more threads in the wait() function.
316 * This method is used to prevent the destruction of the barrier
317 * while there are threads in wait().
318 * @return true, if no thread currently is in wait()
319 */
320bool
322{
323 if (likely(data_->own_mutex))
324 data_->mutex->lock();
325 bool res = num_threads_in_wait_function_ == 0;
326 if (likely(data_->own_mutex))
327 data_->mutex->unlock();
328
329 return res;
330}
331
332} // end namespace fawkes
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
unsigned int _count
Number of threads that are expected to wait for the barrier.
Definition: barrier.h:47
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:153
unsigned int count()
Get number of threads this barrier will wait for.
Definition: barrier.cpp:176
Base class for exceptions in Fawkes.
Definition: exception.h:36
void print_trace() noexcept
Prints trace to stderr.
Definition: exception.cpp:601
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
void interrupt() noexcept
Interrupt the barrier.
virtual void wait()
Wait for other threads.
InterruptibleBarrier(unsigned int count)
Constructor.
void reset() noexcept
Clears the barrier.
virtual ~InterruptibleBarrier()
Destructor.
bool no_threads_in_wait()
Checks if there are no more threads in the wait() function.
RefPtr< ThreadList > passed_threads()
Get a list of threads that passed the barrier.
Mutex mutual exclusion lock.
Definition: mutex.h:33
RefPtr<> is a reference-counting shared smartpointer.
Definition: refptr.h:50
List of threads.
Definition: thread_list.h:56
static Thread * current_thread()
Get the Thread instance of the currently running thread.
Definition: thread.cpp:1366
Fawkes library namespace.