Fawkes API Fawkes Development Version
base_thread.cpp
1
2/***************************************************************************
3 * base_thread.cpp - FireVision Base Thread
4 *
5 * Created: Tue May 29 16:41:50 2007
6 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL file in the doc directory.
21 */
22
23#include "base_thread.h"
24
25#include "acquisition_thread.h"
26#include "aqt_vision_threads.h"
27
28#include <aspect/vision.h>
29#include <core/exceptions/software.h>
30#include <core/threading/barrier.h>
31#include <core/threading/mutex.h>
32#include <core/threading/mutex_locker.h>
33#include <core/threading/thread.h>
34#include <fvcams/cam_exceptions.h>
35#include <fvcams/control/factory.h>
36#include <fvcams/factory.h>
37#include <fvutils/ipc/shm_image.h>
38#include <fvutils/ipc/shm_lut.h>
39#include <fvutils/system/camargp.h>
40#include <logging/logger.h>
41
42#include <algorithm>
43#include <unistd.h>
44
45using namespace fawkes;
46using namespace firevision;
47
48/** @class FvBaseThread "base_thread.h"
49 * FireVision base thread.
50 * This implements the functionality of the FvBasePlugin.
51 * @author Tim Niemueller
52 */
53
54/** Constructor. */
56: Thread("FvBaseThread", Thread::OPMODE_WAITFORWAKEUP),
57 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE),
59{
60 // default to 30 seconds
61 aqt_timeout_ = 30;
62 aqt_barrier_ = new Barrier(1);
63}
64
65/** Destructor. */
67{
68 delete aqt_barrier_;
69}
70
71void
73{
74 // wipe all previously existing FireVision shared memory segments
75 // that are orphaned
76 SharedMemoryImageBuffer::cleanup(/* use lister */ false);
77 SharedMemoryLookupTable::cleanup(/* use lister */ false);
78}
79
80void
82{
83 aqts_.lock();
84 for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
85 thread_collector->remove(ait_->second);
86 delete ait_->second;
87 }
88 aqts_.clear();
89 aqts_.unlock();
90 owned_controls_.lock();
92 for (i = owned_controls_.begin(); i != owned_controls_.end(); ++i) {
93 delete *i;
94 }
95 owned_controls_.clear();
96 owned_controls_.unlock();
97}
98
99/** Thread loop. */
100void
102{
103 aqts_.lock();
104
105 try {
106 for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
107 ait_->second->set_vt_prepfin_hold(true);
108 }
109 } catch (Exception &e) {
110 logger->log_warn(name(), "Cannot get prepfin hold status, skipping this loop");
111 for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
112 ait_->second->set_vt_prepfin_hold(false);
113 }
114 aqts_.unlock();
115 return;
116 }
117
118 // Wakeup all cyclic acquisition threads and wait for them
119 for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
120 if (ait_->second->aqtmode() == FvAcquisitionThread::AqtCyclic) {
121 //logger->log_debug(name(), "Waking Thread %s", ait_->second->name());
122 ait_->second->wakeup(aqt_barrier_);
123 }
124 }
125
126 aqt_barrier_->wait();
127
128 // Check for aqt timeouts
129 for (ait_ = aqts_.begin(); ait_ != aqts_.end();) {
130 if (ait_->second->vision_threads->empty()
131 && (ait_->second->vision_threads->empty_time() > aqt_timeout_)) {
132 logger->log_info(name(), "Acquisition thread %s timed out, destroying", ait_->second->name());
133
134 thread_collector->remove(ait_->second);
135 delete ait_->second;
136 aqts_.erase(ait_++);
137 } else {
138 ++ait_;
139 }
140 }
141
142 started_threads_.lock();
144 while (stit != started_threads_.end()) {
146 "Thread %s has been started, %zu",
147 stit->second->name(),
148 started_threads_.size());
149
150 // if the thread is registered in that aqt mark it running
151 stit->second->vision_threads->set_thread_running(stit->first);
152
153 if (stit->second->vision_threads->has_cyclic_thread()) {
154 // Make thread actually capture data
155 stit->second->set_enabled(true);
156
157 if (stit->second->aqtmode() != FvAcquisitionThread::AqtCyclic) {
159 "Switching acquisition thread %s to cyclic mode",
160 stit->second->name());
161
162 stit->second->prepare_finalize();
163 stit->second->cancel();
164 stit->second->join();
165 stit->second->set_aqtmode(FvAcquisitionThread::AqtCyclic);
166 stit->second->start();
167 stit->second->cancel_finalize();
168 }
169 } else if (stit->second->vision_threads->has_cont_thread()) {
170 // Make thread actually capture data
171 stit->second->set_enabled(true);
172
173 if (stit->second->aqtmode() != FvAcquisitionThread::AqtContinuous) {
175 "Switching acquisition thread %s to continuous mode",
176 stit->second->name());
177 stit->second->prepare_finalize();
178 stit->second->cancel();
179 stit->second->join();
180 stit->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
181 stit->second->start();
182 stit->second->cancel_finalize();
183 }
184 } else {
186 "Acquisition thread %s has no threads while we expected some",
187 stit->second->name());
188 // Make thread stop capturing data
189 stit->second->set_enabled(false);
190 }
191
193 ++stit;
194 started_threads_.erase(stittmp);
195 }
196 started_threads_.unlock();
197
198 // Re-create barrier as necessary after _adding_ threads
199 unsigned int num_cyclic_threads = 0;
200 for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
201 if (ait_->second->vision_threads->has_cyclic_thread()) {
202 ++num_cyclic_threads;
203 }
204 }
205 cond_recreate_barrier(num_cyclic_threads);
206
207 for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
208 ait_->second->set_vt_prepfin_hold(false);
209 }
210
211 aqts_.unlock();
212}
213
214/** Get vision master.
215 * @return vision master
216 */
219{
220 return this;
221}
222
223Camera *
224FvBaseThread::register_for_camera(const char *camera_string, Thread *thread, colorspace_t cspace)
225{
226 Camera *c = NULL;
227 aqts_.lock();
228
229 logger->log_info(name(), "Thread '%s' registers for camera '%s'", thread->name(), camera_string);
230
231 VisionAspect *vision_thread = dynamic_cast<VisionAspect *>(thread);
232 if (vision_thread == NULL) {
233 throw TypeMismatchException("Thread is not a vision thread");
234 }
235
236 CameraArgumentParser *cap = new CameraArgumentParser(camera_string);
237 try {
238 std::string id = cap->cam_type() + "." + cap->cam_id();
239 if (aqts_.find(id) != aqts_.end()) {
240 // this camera has already been loaded
241 c = aqts_[id]->camera_instance(cspace,
242 (vision_thread->vision_thread_mode()
243 == VisionAspect::CONTINUOUS));
244
245 aqts_[id]->vision_threads->add_waiting_thread(thread);
246
247 } else {
248 Camera *cam = NULL;
249 try {
250 cam = CameraFactory::instance(cap);
251 cam->open();
252 } catch (Exception &e) {
253 delete cam;
254 e.append("Could not open camera");
255 throw;
256 }
257
258 FvAcquisitionThread *aqt = new FvAcquisitionThread(id.c_str(), cam, logger, clock);
259
260 c = aqt->camera_instance(cspace,
261 (vision_thread->vision_thread_mode() == VisionAspect::CONTINUOUS));
262
264
265 aqts_[id] = aqt;
266 thread_collector->add(aqt);
267
268 // no need to recreate barrier, by default aqts operate in continuous mode
269
271 "Acquisition thread '%s' started for thread '%s' and camera '%s'",
272 aqt->name(),
273 thread->name(),
274 id.c_str());
275 }
276
277 thread->add_notification_listener(this);
278
279 } catch (UnknownCameraTypeException &e) {
280 delete cap;
281 e.append("FvBaseVisionMaster: could not instantiate camera");
282 aqts_.unlock();
283 throw;
284 } catch (Exception &e) {
285 delete cap;
286 e.append("FvBaseVisionMaster: could not open or start camera");
287 aqts_.unlock();
288 throw;
289 }
290
291 delete cap;
292
293 aqts_.unlock();
294 return c;
295}
296
297Camera *
298FvBaseThread::register_for_raw_camera(const char *camera_string, Thread *thread)
299{
300 Camera * camera = register_for_camera(camera_string, thread, CS_UNKNOWN);
301 CameraArgumentParser cap(camera_string);
302 try {
303 std::string id = cap.cam_type() + "." + cap.cam_id();
304 aqts_.lock();
305 if (aqts_.find(id) != aqts_.end()) {
306 aqts_[id]->raw_subscriber_thread = thread;
307 }
308 aqts_.unlock();
309 } catch (Exception &e) {
310 aqts_.unlock();
311 throw;
312 }
313 return camera;
314}
315
317FvBaseThread::create_camctrl(const char *camera_string)
318{
319 CameraControl *cc = CameraControlFactory::instance(camera_string);
320 if (cc) {
321 owned_controls_.lock();
322 owned_controls_.push_back(cc);
323 owned_controls_.sort();
324 owned_controls_.unique();
325 owned_controls_.unlock();
326 return cc;
327 } else {
328 throw Exception("Cannot create camera control of desired type");
329 }
330}
331
333FvBaseThread::acquire_camctrl(const char *cam_string)
334{
335 CameraArgumentParser cap(cam_string);
336 std::string id = cap.cam_type() + "." + cap.cam_id();
337
338 // Has this camera been loaded?
339 MutexLocker lock(aqts_.mutex());
340 if (aqts_.find(id) != aqts_.end()) {
341 return CameraControlFactory::instance(aqts_[id]->get_camera());
342 } else {
343 return create_camctrl(cam_string);
344 }
345}
346
348FvBaseThread::acquire_camctrl(const char *cam_string, const std::type_info &typeinf)
349{
350 CameraArgumentParser cap(cam_string);
351 std::string id = cap.cam_type() + "." + cap.cam_id();
352
353 // Has this camera been loaded?
354 MutexLocker lock(aqts_.mutex());
355 if (aqts_.find(id) != aqts_.end()) {
356 return CameraControlFactory::instance(typeinf, aqts_[id]->get_camera());
357 } else {
358 return create_camctrl(cam_string);
359 }
360}
361
362void
364{
365 owned_controls_.lock();
367 if ((f = std::find(owned_controls_.begin(), owned_controls_.end(), cc))
368 != owned_controls_.end()) {
369 delete *f;
370 owned_controls_.erase(f);
371 }
372 owned_controls_.unlock();
373}
374
375/** Conditionally re-create barriers.
376 * Re-create barriers if the number of cyclic threads has changed.
377 * @param num_cyclic_threads new number of cyclic threads
378 */
379void
380FvBaseThread::cond_recreate_barrier(unsigned int num_cyclic_threads)
381{
382 if ((num_cyclic_threads + 1) != aqt_barrier_->count()) {
383 delete aqt_barrier_;
384 aqt_barrier_ = new Barrier(num_cyclic_threads + 1); // +1 for base thread
385 }
386}
387
388void
390{
391 aqts_.lock();
392 unsigned int num_cyclic_threads = 0;
393
394 for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
395 // Remove thread from all aqts
396 ait_->second->vision_threads->remove_thread(thread);
397
398 if (ait_->second->raw_subscriber_thread == thread) {
399 ait_->second->raw_subscriber_thread = NULL;
400 }
401
402 if (ait_->second->vision_threads->has_cyclic_thread()) {
403 ++num_cyclic_threads;
404
405 } else if (ait_->second->aqtmode() != FvAcquisitionThread::AqtContinuous) {
407 "Switching acquisition thread %s to continuous mode "
408 "on unregister",
409 ait_->second->name());
410
411 ait_->second->prepare_finalize();
412 ait_->second->cancel();
413 ait_->second->join();
414 ait_->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
415 ait_->second->start();
416 ait_->second->cancel_finalize();
417 }
418
419 if (ait_->second->vision_threads->empty()) {
420 // Make thread stop capturing data
422 "Disabling capturing on thread %s (no more threads)",
423 ait_->second->name());
424 ait_->second->set_enabled(false);
425 }
426 }
427 // Recreate as necessary after _removing_ threads
428 cond_recreate_barrier(num_cyclic_threads);
429
430 aqts_.unlock();
431}
432
433bool
435{
436 aqts_.lock();
437 for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
438 if (ait_->second->vision_threads->has_waiting_thread(thread)) {
439 started_threads_.lock();
440 started_threads_[thread] = ait_->second;
441 started_threads_.unlock();
442 }
443 }
444 aqts_.unlock();
445
446 return false;
447}
448
449bool
451{
452 aqts_.lock();
453 for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
454 ait_->second->vision_threads->remove_waiting_thread(thread);
455 }
456 aqts_.unlock();
457
458 return false;
459}
FireVision base application acquisition thread.
FvAqtVisionThreads * vision_threads
Vision threads assigned to this acquisition thread.
@ AqtContinuous
continuous mode, use if there are only continuous threads for this acquisition thread.
@ AqtCyclic
cyclic mode, use if there is at least one cyclic thread for this acquisition thread.
firevision::Camera * camera_instance(firevision::colorspace_t cspace, bool deep_copy)
Get a camera instance.
void add_waiting_thread(fawkes::Thread *thread)
Add a thread in waiting state.
virtual void release_camctrl(firevision::CameraControl *cc)
Release a camera control.
virtual ~FvBaseThread()
Destructor.
Definition: base_thread.cpp:66
virtual firevision::VisionMaster * vision_master()
Get vision master.
virtual firevision::CameraControl * acquire_camctrl(const char *cam_string)
Retrieve a CameraControl for the specified camera string.
virtual void loop()
Thread loop.
virtual bool thread_init_failed(fawkes::Thread *thread) noexcept
Thread initialization failed.
FvBaseThread()
Constructor.
Definition: base_thread.cpp:55
virtual void unregister_thread(fawkes::Thread *thread)
Unregister a thread.
virtual void finalize()
Finalize the thread.
Definition: base_thread.cpp:81
virtual firevision::Camera * register_for_raw_camera(const char *camera_string, fawkes::Thread *thread)
Register thread for camera.
virtual firevision::Camera * register_for_camera(const char *camera_string, fawkes::Thread *thread, firevision::colorspace_t cspace=firevision::YUV422_PLANAR)
Register thread for camera.
virtual bool thread_started(fawkes::Thread *thread) noexcept
Thread started successfully.
virtual void init()
Initialize the thread.
Definition: base_thread.cpp:72
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:153
unsigned int count()
Get number of threads this barrier will wait for.
Definition: barrier.cpp:176
Thread aspect to use blocked timing.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Base class for exceptions in Fawkes.
Definition: exception.h:36
void append(const char *format,...) noexcept
Append messages to the message list.
Definition: exception.cpp:333
List with a lock.
Definition: lock_list.h:45
virtual void unlock() const
Unlock list.
Definition: lock_list.h:138
virtual void lock() const
Lock list.
Definition: lock_list.h:124
Map with a lock.
Definition: lock_map.h:36
void lock() const
Lock list.
Definition: lock_map.h:91
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:133
void unlock() const
Unlock list.
Definition: lock_map.h:109
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Mutex locking helper.
Definition: mutex_locker.h:34
virtual void add(ThreadList &tl)=0
Add multiple threads.
virtual void remove(ThreadList &tl)=0
Remove multiple threads.
ThreadCollector * thread_collector
Thread collector.
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
void add_notification_listener(ThreadNotificationListener *notification_listener)
Add notification listener.
Definition: thread.cpp:1162
Thread aspect to use in FireVision apps.
Definition: vision.h:33
VisionThreadMode vision_thread_mode()
Get the vision thread mode of this thread.
Definition: vision.cpp:81
Vision Master Aspect.
Definition: vision_master.h:36
Camera argument parser.
Definition: camargp.h:36
std::string cam_id() const
Get camera ID.
Definition: camargp.cpp:133
std::string cam_type() const
Get camera type.
Definition: camargp.cpp:122
Camera control interface base class.
Definition: control.h:31
Camera interface for image aquiring devices in FireVision.
Definition: camera.h:33
virtual void open()=0
Open the camera.
Unknown camera type exception.
Fawkes library namespace.