24 #include <aspect/blocked_timing.h> 25 #include <baseapp/thread_manager.h> 26 #include <core/exceptions/software.h> 27 #include <core/exceptions/system.h> 28 #include <core/threading/mutex_locker.h> 29 #include <core/threading/thread.h> 30 #include <core/threading/thread_finalizer.h> 31 #include <core/threading/thread_initializer.h> 32 #include <core/threading/wait_condition.h> 56 ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(
57 ThreadManager *parent_manager)
59 parent_manager_ = parent_manager;
63 ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl)
65 BlockedTimingAspect *timed_thread;
67 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
68 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL) {
69 throw IllegalArgumentException(
70 "ThreadProducerAspect may not add threads with BlockedTimingAspect");
74 parent_manager_->add_maybelocked(tl,
false);
78 ThreadManager::ThreadManagerAspectCollector::add(Thread *t)
80 BlockedTimingAspect *timed_thread;
82 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
83 throw IllegalArgumentException(
84 "ThreadProducerAspect may not add threads with BlockedTimingAspect");
87 parent_manager_->add_maybelocked(t,
false);
91 ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl)
93 BlockedTimingAspect *timed_thread;
95 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
96 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL) {
97 throw IllegalArgumentException(
98 "ThreadProducerAspect may not remove threads with BlockedTimingAspect");
102 parent_manager_->remove_maybelocked(tl,
false);
106 ThreadManager::ThreadManagerAspectCollector::remove(Thread *t)
108 BlockedTimingAspect *timed_thread;
110 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
111 throw IllegalArgumentException(
112 "ThreadProducerAspect may not remove threads with BlockedTimingAspect");
115 parent_manager_->remove_maybelocked(t,
false);
121 throw AccessViolationException(
"ThreadManagerAspect threads may not force removal of threads");
125 ThreadManager::ThreadManagerAspectCollector::force_remove(
fawkes::Thread *t)
127 throw AccessViolationException(
"ThreadManagerAspect threads may not force removal of threads");
140 interrupt_timed_thread_wait_ =
false;
141 aspect_collector_ =
new ThreadManagerAspectCollector(
this);
156 interrupt_timed_thread_wait_ =
false;
157 aspect_collector_ =
new ThreadManagerAspectCollector(
this);
166 for (tit_ = threads_.begin(); tit_ != threads_.end(); ++tit_) {
168 tit_->second.force_stop(finalizer_);
178 delete waitcond_timedthreads_;
179 delete aspect_collector_;
190 initializer_ = initializer;
191 finalizer_ = finalizer;
202 ThreadManager::internal_remove_thread(
Thread *t)
206 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
209 if (threads_.find(hook) != threads_.end()) {
210 threads_[hook].remove_locked(t);
211 if (threads_[hook].empty())
212 threads_.erase(hook);
227 ThreadManager::internal_add_thread(Thread *t)
229 BlockedTimingAspect *timed_thread;
230 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
233 if (threads_.find(hook) == threads_.end()) {
234 threads_[hook].set_name(
"ThreadManagerList Hook %i", hook);
235 threads_[hook].set_maintain_barrier(
true);
237 threads_[hook].push_back_locked(t);
257 ThreadManager::add_maybelocked(ThreadList &tl,
bool lock)
259 if (!(initializer_ && finalizer_)) {
260 throw NullPointerException(
"ThreadManager: initializer/finalizer not set");
264 throw Exception(
"Not accepting new threads from list that is not fresh, " 265 "list '%s' already sealed",
273 tl.init(initializer_, finalizer_);
274 }
catch (Exception &e) {
283 MutexLocker locker(threads_.mutex(), lock);
284 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
285 internal_add_thread(*i);
302 ThreadManager::add_maybelocked(Thread *thread,
bool lock)
304 if (thread == NULL) {
305 throw NullPointerException(
"FawkesThreadMananger: cannot add NULL as thread");
308 if (!(initializer_ && finalizer_)) {
309 throw NullPointerException(
"ThreadManager: initializer/finalizer not set");
313 initializer_->
init(thread);
314 }
catch (CannotInitializeThreadException &e) {
315 thread->notify_of_failed_init();
316 e.append(
"Adding thread in ThreadManager failed");
325 }
catch (CannotInitializeThreadException &e) {
326 thread->notify_of_failed_init();
329 }
catch (Exception &e) {
330 thread->notify_of_failed_init();
331 CannotInitializeThreadException cite(e);
332 cite.append(
"Could not initialize thread '%s' (ThreadManager)", thread->name());
335 }
catch (std::exception &e) {
336 thread->notify_of_failed_init();
337 CannotInitializeThreadException cite;
338 cite.append(
"Caught std::exception: %s", e.what());
339 cite.append(
"Could not initialize thread '%s' (ThreadManager)", thread->name());
343 thread->notify_of_failed_init();
344 CannotInitializeThreadException cite(
"Could not initialize thread '%s' (ThreadManager)",
346 cite.append(
"Unknown exception caught");
352 MutexLocker locker(threads_.mutex(), lock);
353 internal_add_thread(thread);
371 ThreadManager::remove_maybelocked(ThreadList &tl,
bool lock)
373 if (!(initializer_ && finalizer_)) {
374 throw NullPointerException(
"ThreadManager: initializer/finalizer not set");
378 throw ThreadListNotSealedException(
"(ThreadManager) Cannot remove unsealed thread " 379 "list. Not accepting unsealed list '%s' for removal",
384 MutexLocker locker(threads_.mutex(), lock);
387 if (!tl.prepare_finalize(finalizer_)) {
388 tl.cancel_finalize();
390 throw CannotFinalizeThreadException(
"One or more threads in list '%s' cannot be " 394 }
catch (CannotFinalizeThreadException &e) {
397 }
catch (Exception &e) {
399 e.append(
"One or more threads in list '%s' cannot be finalized", tl.name());
400 throw CannotFinalizeThreadException(e);
405 tl.finalize(finalizer_);
406 }
catch (Exception &e) {
411 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
412 internal_remove_thread(*i);
431 ThreadManager::remove_maybelocked(Thread *thread,
bool lock)
436 if (!(initializer_ && finalizer_)) {
437 throw NullPointerException(
"ThreadManager: initializer/finalizer not set");
440 MutexLocker locker(threads_.mutex(), lock);
442 if (!thread->prepare_finalize()) {
443 thread->cancel_finalize();
444 throw CannotFinalizeThreadException(
"Thread '%s'cannot be finalized", thread->name());
446 }
catch (CannotFinalizeThreadException &e) {
447 e.append(
"ThreadManager cannot stop thread '%s'", thread->name());
448 thread->cancel_finalize();
457 internal_remove_thread(thread);
484 threads_.mutex()->stopby();
485 bool caught_exception =
false;
486 Exception exc(
"Forced removal of thread list %s failed", tl.
name());
490 caught_exception =
true;
494 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
495 internal_remove_thread(*i);
500 if (caught_exception) {
537 internal_remove_thread(thread);
545 unsigned int timeout_sec = 0;
546 if (timeout_usec >= 1000000) {
547 timeout_sec = timeout_usec / 1000000;
548 timeout_usec -= timeout_sec * 1000000;
552 if (threads_.find(hook) != threads_.end()) {
553 threads_[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
562 if (threads_.find(hook) != threads_.end()) {
564 threads_[hook].wakeup(barrier);
566 threads_[hook].wakeup();
568 if (threads_[hook].size() == 0) {
569 threads_.erase(hook);
578 for (tit_ = threads_.begin(); tit_ != threads_.end(); ++tit_) {
579 tit_->second.try_recover(recovered_threads);
587 return (threads_.size() > 0);
593 interrupt_timed_thread_wait_ =
false;
594 waitcond_timedthreads_->
wait();
595 if (interrupt_timed_thread_wait_) {
596 interrupt_timed_thread_wait_ =
false;
604 interrupt_timed_thread_wait_ =
true;
614 return aspect_collector_;
bool sealed()
Check if list is sealed.
void set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Set initializer/finalizer.
Wait until a given condition holds.
const char * name()
Name of the thread list.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
virtual void lock() const
Lock list.
Fawkes library namespace.
void wake_all()
Wake up all waiting threads.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
virtual ~ThreadManager()
Destructor.
virtual void finalize(Thread *thread)=0
Finalize a thread.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
virtual bool timed_threads_exist()
Check if any timed threads exist.
Thread class encapsulation of pthreads.
virtual void force_remove(ThreadList &tl)
Force removal of the given threads.
ThreadManager()
Constructor.
Thread aspect to use blocked timing.
virtual void wait_for_timed_threads()
Wait for timed threads.
WakeupHook
Type to define at which hook the thread is woken up.
Base class for exceptions in Fawkes.
void remove_locked(Thread *thread)
Remove with lock protection.
Thread initializer interface.
virtual void finalize()
Finalize the thread.
bool prepare_finalize()
Prepare finalization.
virtual void unlock() const
Unlock list.
The current system call has been interrupted (for instance by a signal).
void wait()
Wait for the condition forever.
virtual void wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier=0)
Wakeup thread for given hook.
virtual void wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, unsigned int timeout_usec=0)
Wakeup thread for given hook and wait for completion.
void cancel()
Cancel a thread.
Thread list not sealed exception.
virtual void interrupt_timed_thread_wait()
Interrupt any currently running wait_for_timed_threads() and cause it to throw an InterruptedExceptio...
void join()
Join the thread.
virtual void try_recover(std::list< std::string > &recovered_threads)
Try to recover threads.
WakeupHook blockedTimingAspectHook() const
Get the wakeup hook.
ThreadCollector * aspect_collector() const
Get a thread collector to be used for an aspect initializer.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Thread finalizer interface.