Fawkes API Fawkes Development Version
notifier.cpp
1
2/***************************************************************************
3 * notifier.cpp - BlackBoard notifier
4 *
5 * Created: Mon Mar 03 23:28:18 2008
6 * Copyright 2006-2008 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version. A runtime exception applies to
14 * this software (see LICENSE.GPL_WRE file mentioned below for details).
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Library General Public License for more details.
20 *
21 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22 */
23
24#include <blackboard/blackboard.h>
25#include <blackboard/interface_listener.h>
26#include <blackboard/interface_observer.h>
27#include <blackboard/internal/notifier.h>
28#include <core/threading/mutex.h>
29#include <core/threading/mutex_locker.h>
30#include <core/utils/lock_hashmap.h>
31#include <core/utils/lock_hashset.h>
32#include <interface/interface.h>
33#include <logging/liblogger.h>
34
35#include <algorithm>
36#include <cstdlib>
37#include <cstring>
38#include <fnmatch.h>
39#include <functional>
40
41namespace fawkes {
42
43/** @class BlackBoardNotifier <blackboard/internal/notifier.h>
44 * BlackBoard notifier.
45 * This class is used by the BlackBoard to notify listeners and observers
46 * of changes.
47 *
48 * @author Tim Niemueller
49 */
50
51/** Constructor. */
53{
54 bbil_writer_events_ = 0;
55 bbil_writer_mutex_ = new Mutex();
56
57 bbil_reader_events_ = 0;
58 bbil_reader_mutex_ = new Mutex();
59
60 bbil_data_events_ = 0;
61 bbil_data_mutex_ = new Mutex();
62
63 bbil_messages_events_ = 0;
64 bbil_messages_mutex_ = new Mutex();
65
66 bbio_events_ = 0;
67 bbio_mutex_ = new Mutex();
68}
69
70/** Destructor */
72{
73 delete bbil_writer_mutex_;
74 delete bbil_reader_mutex_;
75 delete bbil_data_mutex_;
76 delete bbil_messages_mutex_;
77
78 delete bbio_mutex_;
79}
80
81/** Register BB event listener.
82 * @param listener BlackBoard event listener to register
83 * @param flag concatenation of flags denoting which queue entries should be
84 * processed
85 */
86void
89{
90 update_listener(listener, flag);
91}
92
93/** Update BB event listener.
94 * @param listener BlackBoard event listener to update subscriptions of
95 * @param flag concatenation of flags denoting which queue entries should be
96 * processed
97 */
98void
101{
102 const BlackBoardInterfaceListener::InterfaceQueue &queue = listener->bbil_acquire_queue();
103
104 BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
105
106 for (i = queue.begin(); i != queue.end(); ++i) {
107 switch (i->type) {
109 if (flag & BlackBoard::BBIL_FLAG_DATA) {
110 proc_listener_maybe_queue(i->op,
111 i->interface,
112 listener,
113 bbil_data_mutex_,
114 bbil_data_events_,
115 bbil_data_,
116 bbil_data_queue_,
117 "data");
118 }
119 break;
122 proc_listener_maybe_queue(i->op,
123 i->interface,
124 listener,
125 bbil_messages_mutex_,
126 bbil_messages_events_,
127 bbil_messages_,
128 bbil_messages_queue_,
129 "messages");
130 }
131 break;
133 if (flag & BlackBoard::BBIL_FLAG_READER) {
134 proc_listener_maybe_queue(i->op,
135 i->interface,
136 listener,
137 bbil_reader_mutex_,
138 bbil_reader_events_,
139 bbil_reader_,
140 bbil_reader_queue_,
141 "reader");
142 }
143 break;
145 if (flag & BlackBoard::BBIL_FLAG_WRITER) {
146 proc_listener_maybe_queue(i->op,
147 i->interface,
148 listener,
149 bbil_writer_mutex_,
150 bbil_writer_events_,
151 bbil_writer_,
152 bbil_writer_queue_,
153 "writer");
154 }
155 break;
156 default: break;
157 }
158 }
159
160 listener->bbil_release_queue(flag);
161}
162
163void
164BlackBoardNotifier::proc_listener_maybe_queue(bool op,
165 Interface * interface,
167 Mutex * mutex,
168 unsigned int & events,
169 BBilMap & map,
170 BBilQueue & queue,
171 const char * hint)
172{
173 MutexLocker lock(mutex);
174 if (events > 0) {
175 LibLogger::log_warn("BlackBoardNotifier",
176 "%s interface "
177 "listener %s for %s events (queued)",
178 op ? "Registering" : "Unregistering",
179 listener->bbil_name(),
180 hint);
181
182 queue_listener(op, interface, listener, queue);
183 } else {
184 if (op) { // add
185 add_listener(interface, listener, map);
186 } else {
187 remove_listener(interface, listener, map);
188 }
189 }
190}
191
192/** Unregister BB interface listener.
193 * This will remove the given BlackBoard interface listener from any
194 * event that it was previously registered for.
195 * @param listener BlackBoard event listener to remove
196 */
197void
199{
200 const BlackBoardInterfaceListener::InterfaceMaps maps = listener->bbil_acquire_maps();
201
202 BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
203 for (i = maps.data.begin(); i != maps.data.end(); ++i) {
204 proc_listener_maybe_queue(false,
205 i->second,
206 listener,
207 bbil_data_mutex_,
208 bbil_data_events_,
209 bbil_data_,
210 bbil_data_queue_,
211 "data");
212 }
213
214 for (i = maps.messages.begin(); i != maps.messages.end(); ++i) {
215 proc_listener_maybe_queue(false,
216 i->second,
217 listener,
218 bbil_messages_mutex_,
219 bbil_messages_events_,
220 bbil_messages_,
221 bbil_messages_queue_,
222 "messages");
223 }
224
225 for (i = maps.reader.begin(); i != maps.reader.end(); ++i) {
226 proc_listener_maybe_queue(false,
227 i->second,
228 listener,
229 bbil_reader_mutex_,
230 bbil_reader_events_,
231 bbil_reader_,
232 bbil_reader_queue_,
233 "reader");
234 }
235
236 for (i = maps.writer.begin(); i != maps.writer.end(); ++i) {
237 proc_listener_maybe_queue(false,
238 i->second,
239 listener,
240 bbil_writer_mutex_,
241 bbil_writer_events_,
242 bbil_writer_,
243 bbil_writer_queue_,
244 "writer");
245 }
246
247 listener->bbil_release_maps();
248}
249
250/** Add listener for specified map.
251 * @param listener interface listener for events
252 * @param im map of interfaces to listen for
253 * @param ilmap internal map to add listener to
254 */
255void
256BlackBoardNotifier::add_listener(Interface * interface,
258 BBilMap & ilmap)
259{
260 std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
261
262 BBilMap::value_type v = std::make_pair(interface->uid(), listener);
263 BBilMap::iterator f = std::find(ret.first, ret.second, v);
264
265 if (f == ret.second) {
266 ilmap.insert(std::make_pair(interface->uid(), listener));
267 }
268}
269
270void
271BlackBoardNotifier::remove_listener(Interface * interface,
272 BlackBoardInterfaceListener *listener,
273 BBilMap & ilmap)
274{
275 std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
276 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
277 if (j->second == listener) {
278 ilmap.erase(j);
279 break;
280 }
281 }
282}
283
284bool
285BlackBoardNotifier::is_in_queue(bool op,
286 BBilQueue & queue,
287 const char * uid,
288 BlackBoardInterfaceListener *bbil)
289{
290 BBilQueue::iterator q;
291 for (q = queue.begin(); q != queue.end(); ++q) {
292 if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
293 return true;
294 }
295 }
296 return false;
297}
298
299void
300BlackBoardNotifier::queue_listener(bool op,
301 Interface * interface,
302 BlackBoardInterfaceListener *listener,
303 BBilQueue & queue)
304{
305 BBilQueueEntry qe = {op, interface->uid(), interface, listener};
306 queue.push_back(qe);
307}
308
309/** Register BB interface observer.
310 * @param observer BlackBoard interface observer to register
311 */
312void
314{
315 bbio_mutex_->lock();
316 if (bbio_events_ > 0) {
317 bbio_queue_.push_back(std::make_pair(1, observer));
318 } else {
319 add_observer(observer, observer->bbio_get_observed_create(), bbio_created_);
320 add_observer(observer, observer->bbio_get_observed_destroy(), bbio_destroyed_);
321 }
322 bbio_mutex_->unlock();
323}
324
325void
326BlackBoardNotifier::add_observer(BlackBoardInterfaceObserver * observer,
328 BBioMap & bbiomap)
329{
331 its->lock();
332 for (i = its->begin(); i != its->end(); ++i) {
333 bbiomap[i->first].push_back(make_pair(observer, i->second));
334 }
335 its->unlock();
336}
337
338/** Remove observer from map.
339 * @param iomap interface observer map to remove the observer from
340 * @param observer observer to remove
341 */
342void
343BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
344{
345 BBioMapIterator i, tmp;
346
347 i = iomap.begin();
348 while (i != iomap.end()) {
349 BBioListIterator j = i->second.begin();
350 while (j != i->second.end()) {
351 if (j->first == observer) {
352 j = i->second.erase(j);
353 } else {
354 ++j;
355 }
356 }
357 if (i->second.empty()) {
358 tmp = i;
359 ++i;
360 iomap.erase(tmp);
361 } else {
362 ++i;
363 }
364 }
365}
366
367/** Unregister BB interface observer.
368 * This will remove the given BlackBoard event listener from any event that it was
369 * previously registered for.
370 * @param observer BlackBoard event listener to remove
371 */
372void
374{
375 MutexLocker lock(bbio_mutex_);
376 if (bbio_events_ > 0) {
377 BBioQueueEntry e = std::make_pair((unsigned int)0, observer);
378 BBioQueue::iterator re;
379 while ((re = find_if(bbio_queue_.begin(),
380 bbio_queue_.end(),
381 bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
382 != bbio_queue_.end()) {
383 // if there is an entry in the register queue, remove it!
384 if (re->second == observer) {
385 bbio_queue_.erase(re);
386 }
387 }
388 bbio_queue_.push_back(std::make_pair(0, observer));
389
390 } else {
391 remove_observer(bbio_created_, observer);
392 remove_observer(bbio_destroyed_, observer);
393 }
394}
395
396/** Notify that an interface has been created.
397 * @param type type of the interface
398 * @param id ID of the interface
399 */
400void
401BlackBoardNotifier::notify_of_interface_created(const char *type, const char *id) noexcept
402{
403 bbio_mutex_->lock();
404 bbio_events_ += 1;
405 bbio_mutex_->unlock();
406
407 BBioMapIterator lhmi;
408 BBioListIterator i, l;
409 for (lhmi = bbio_created_.begin(); lhmi != bbio_created_.end(); ++lhmi) {
410 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
411 continue;
412
413 BBioList &list = lhmi->second;
414 for (i = list.begin(); i != list.end(); ++i) {
415 BlackBoardInterfaceObserver *bbio = i->first;
416 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
417 if (fnmatch(pi->c_str(), id, 0) == 0) {
418 bbio->bb_interface_created(type, id);
419 break;
420 }
421 }
422 }
423 }
424
425 bbio_mutex_->lock();
426 bbio_events_ -= 1;
427 process_bbio_queue();
428 bbio_mutex_->unlock();
429}
430
431/** Notify that an interface has been destroyed.
432 * @param type type of the interface
433 * @param id ID of the interface
434 */
435void
436BlackBoardNotifier::notify_of_interface_destroyed(const char *type, const char *id) noexcept
437{
438 bbio_mutex_->lock();
439 bbio_events_ += 1;
440 bbio_mutex_->unlock();
441
442 BBioMapIterator lhmi;
443 BBioListIterator i, l;
444 for (lhmi = bbio_destroyed_.begin(); lhmi != bbio_destroyed_.end(); ++lhmi) {
445 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
446 continue;
447
448 BBioList &list = (*lhmi).second;
449 for (i = list.begin(); i != list.end(); ++i) {
450 BlackBoardInterfaceObserver *bbio = i->first;
451 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
452 if (fnmatch(pi->c_str(), id, 0) == 0) {
453 bbio->bb_interface_destroyed(type, id);
454 break;
455 }
456 }
457 }
458 }
459
460 bbio_mutex_->lock();
461 bbio_events_ -= 1;
462 process_bbio_queue();
463 bbio_mutex_->unlock();
464}
465
466void
467BlackBoardNotifier::process_bbio_queue()
468{
469 if (!bbio_queue_.empty()) {
470 if (bbio_events_ > 0) {
471 return;
472 } else {
473 while (!bbio_queue_.empty()) {
474 BBioQueueEntry &e = bbio_queue_.front();
475 if (e.first) { // register
476 add_observer(e.second, e.second->bbio_get_observed_create(), bbio_created_);
477 add_observer(e.second, e.second->bbio_get_observed_destroy(), bbio_destroyed_);
478 } else { // unregister
479 remove_observer(bbio_created_, e.second);
480 remove_observer(bbio_destroyed_, e.second);
481 }
482 bbio_queue_.pop_front();
483 }
484 }
485 }
486}
487
488/** Notify that writer has been added.
489 * @param interface the interface for which the event happened. It is not necessarily the
490 * instance which caused the event, but it must have the same mem serial.
491 * @param event_instance_serial the instance serial of the interface that caused the event
492 * @see BlackBoardInterfaceListener::bb_interface_writer_added()
493 */
494void
496 Uuid event_instance_serial) noexcept
497{
498 bbil_writer_mutex_->lock();
499 bbil_writer_events_ += 1;
500 bbil_writer_mutex_->unlock();
501
502 const char * uid = interface->uid();
503 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
504 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
505 BlackBoardInterfaceListener *bbil = j->second;
506 if (!is_in_queue(/* remove op*/ false, bbil_writer_queue_, uid, bbil)) {
507 Interface *bbil_iface = bbil->bbil_writer_interface(uid);
508 if (bbil_iface != NULL) {
509 bbil->bb_interface_writer_added(bbil_iface, event_instance_serial);
510 } else {
511 LibLogger::log_warn("BlackBoardNotifier",
512 "BBIL[%s] registered for writer events "
513 "(open) for '%s' but has no such interface",
514 bbil->bbil_name(),
515 uid);
516 }
517 }
518 }
519
520 bbil_writer_mutex_->lock();
521 bbil_writer_events_ -= 1;
522 process_writer_queue();
523 bbil_writer_mutex_->unlock();
524}
525
526/** Notify that writer has been removed.
527 * @param interface interface for which the writer has been removed
528 * @param event_instance_serial instance serial of the interface that caused the event
529 * @see BlackBoardInterfaceListener::bb_interface_writer_removed()
530 */
531void
533 Uuid event_instance_serial) noexcept
534{
535 bbil_writer_mutex_->lock();
536 bbil_writer_events_ += 1;
537 bbil_writer_mutex_->unlock();
538
539 const char * uid = interface->uid();
540 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
541 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
542 BlackBoardInterfaceListener *bbil = j->second;
543 if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
544 Interface *bbil_iface = bbil->bbil_writer_interface(uid);
545 if (bbil_iface != NULL) {
546 bbil->bb_interface_writer_removed(bbil_iface, event_instance_serial);
547 } else {
548 LibLogger::log_warn("BlackBoardNotifier",
549 "BBIL[%s] registered for writer events "
550 "(close) for '%s' but has no such interface",
551 bbil->bbil_name(),
552 uid);
553 }
554 }
555 }
556
557 bbil_writer_mutex_->lock();
558 bbil_writer_events_ -= 1;
559 process_writer_queue();
560 bbil_writer_mutex_->unlock();
561}
562
563void
564BlackBoardNotifier::process_writer_queue()
565{
566 if (!bbil_writer_queue_.empty()) {
567 if (bbil_writer_events_ > 0) {
568 return;
569 } else {
570 while (!bbil_writer_queue_.empty()) {
571 BBilQueueEntry &e = bbil_writer_queue_.front();
572 if (e.op) { // register
573 add_listener(e.interface, e.listener, bbil_writer_);
574 } else { // unregister
575 remove_listener(e.interface, e.listener, bbil_writer_);
576 }
577 bbil_writer_queue_.pop_front();
578 }
579 }
580 }
581}
582
583/** Notify that reader has been added.
584 * @param interface interface for which the reader has been added
585 * @param event_instance_serial instance serial of the interface that caused the event
586 * @see BlackBoardInterfaceListener::bb_interface_reader_added()
587 */
588void
590 Uuid event_instance_serial) noexcept
591{
592 bbil_reader_mutex_->lock();
593 bbil_reader_events_ += 1;
594 bbil_reader_mutex_->unlock();
595
596 const char * uid = interface->uid();
597 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
598 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
599 BlackBoardInterfaceListener *bbil = j->second;
600 if (!is_in_queue(/* remove op*/ false, bbil_reader_queue_, uid, bbil)) {
601 Interface *bbil_iface = bbil->bbil_reader_interface(uid);
602 if (bbil_iface != NULL) {
603 bbil->bb_interface_reader_added(bbil_iface, event_instance_serial);
604 } else {
605 LibLogger::log_warn("BlackBoardNotifier",
606 "BBIL[%s] registered for reader events "
607 "(open) for '%s' but has no such interface",
608 bbil->bbil_name(),
609 uid);
610 }
611 }
612 }
613
614 bbil_reader_mutex_->lock();
615 bbil_reader_events_ -= 1;
616 process_reader_queue();
617 bbil_reader_mutex_->unlock();
618}
619
620/** Notify that reader has been removed.
621 * @param interface interface for which the reader has been removed
622 * @param event_instance_serial instance serial of the interface that caused the event
623 * @see BlackBoardInterfaceListener::bb_interface_reader_removed()
624 */
625void
627 Uuid event_instance_serial) noexcept
628{
629 bbil_reader_mutex_->lock();
630 bbil_reader_events_ += 1;
631 bbil_reader_mutex_->unlock();
632
633 const char * uid = interface->uid();
634 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
635 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
636 BlackBoardInterfaceListener *bbil = j->second;
637 if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
638 Interface *bbil_iface = bbil->bbil_reader_interface(uid);
639 if (bbil_iface != NULL) {
640 bbil->bb_interface_reader_removed(bbil_iface, event_instance_serial);
641 } else {
642 LibLogger::log_warn("BlackBoardNotifier",
643 "BBIL[%s] registered for reader events "
644 "(close) for '%s' but has no such interface",
645 bbil->bbil_name(),
646 uid);
647 }
648 }
649 }
650
651 bbil_reader_mutex_->lock();
652 bbil_reader_events_ -= 1;
653 process_reader_queue();
654 bbil_reader_mutex_->unlock();
655}
656
657void
658BlackBoardNotifier::process_reader_queue()
659{
660 if (!bbil_reader_queue_.empty()) {
661 if (bbil_reader_events_ > 0) {
662 return;
663 } else {
664 while (!bbil_reader_queue_.empty()) {
665 BBilQueueEntry &e = bbil_reader_queue_.front();
666 if (e.op) { // register
667 add_listener(e.interface, e.listener, bbil_reader_);
668 } else { // unregister
669 remove_listener(e.interface, e.listener, bbil_reader_);
670 }
671 bbil_reader_queue_.pop_front();
672 }
673 }
674 }
675}
676
677/** Notify of data change.
678 * Notify all subscribers of the given interface of a data change.
679 * This also influences logging and sending data over the network so it is
680 * mandatory to call this function! The interface base class write method does
681 * that for you.
682 * @param interface interface whose subscribers to notify
683 * @param has_changed whether the current data is different from the last time write() was
684 * called on the interface
685 * @see Interface::write()
686 * @see BlackBoardInterfaceListener::bb_interface_data_changed()
687 */
688void
689BlackBoardNotifier::notify_of_data_refresh(const Interface *interface, bool has_changed)
690{
691 bbil_data_mutex_->lock();
692 bbil_data_events_ += 1;
693 bbil_data_mutex_->unlock();
694
695 const char * uid = interface->uid();
696 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_data_.equal_range(uid);
697 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
698 BlackBoardInterfaceListener *bbil = j->second;
699 if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
700 Interface *bbil_iface = bbil->bbil_data_interface(uid);
701 if (bbil_iface != NULL) {
702 bbil->bb_interface_data_refreshed(bbil_iface);
703 if (has_changed)
704 bbil->bb_interface_data_changed(bbil_iface);
705 } else {
706 LibLogger::log_warn("BlackBoardNotifier",
707 "BBIL[%s] registered for data change events "
708 "for '%s' but has no such interface",
709 bbil->bbil_name(),
710 uid);
711 }
712 }
713 }
714
715 bbil_data_mutex_->lock();
716 bbil_data_events_ -= 1;
717 if (!bbil_data_queue_.empty()) {
718 if (bbil_data_events_ == 0) {
719 while (!bbil_data_queue_.empty()) {
720 BBilQueueEntry &e = bbil_data_queue_.front();
721 if (e.op) { // register
722 add_listener(e.interface, e.listener, bbil_data_);
723 } else { // unregister
724 remove_listener(e.interface, e.listener, bbil_data_);
725 }
726 bbil_data_queue_.pop_front();
727 }
728 }
729 }
730 bbil_data_mutex_->unlock();
731}
732
733/** Notify of message received
734 * Notify all subscribers of the given interface of an incoming message
735 * This also influences logging and sending data over the network so it is
736 * mandatory to call this function! The interface base class write method does
737 * that for you.
738 * @param interface interface whose subscribers to notify
739 * @param message message which is being received
740 * @return false if any listener returned false, true otherwise
741 * @see BlackBoardInterfaceListener::bb_interface_message_received()
742 */
743bool
745{
746 bbil_messages_mutex_->lock();
747 bbil_messages_events_ += 1;
748 bbil_messages_mutex_->unlock();
749
750 bool enqueue = true;
751
752 const char * uid = interface->uid();
753 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_messages_.equal_range(uid);
754 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
755 BlackBoardInterfaceListener *bbil = j->second;
756 if (!is_in_queue(/* remove op*/ false, bbil_messages_queue_, uid, bbil)) {
757 Interface *bbil_iface = bbil->bbil_message_interface(uid);
758 if (bbil_iface != NULL) {
759 bool abort = !bbil->bb_interface_message_received(bbil_iface, message);
760 if (abort) {
761 enqueue = false;
762 break;
763 }
764 } else {
765 LibLogger::log_warn("BlackBoardNotifier",
766 "BBIL[%s] registered for message events "
767 "for '%s' but has no such interface",
768 bbil->bbil_name(),
769 uid);
770 }
771 }
772 }
773
774 bbil_messages_mutex_->lock();
775 bbil_messages_events_ -= 1;
776 if (!bbil_messages_queue_.empty()) {
777 if (bbil_messages_events_ == 0) {
778 while (!bbil_messages_queue_.empty()) {
779 BBilQueueEntry &e = bbil_messages_queue_.front();
780 if (e.op) { // register
781 add_listener(e.interface, e.listener, bbil_messages_);
782 } else { // unregister
783 remove_listener(e.interface, e.listener, bbil_messages_);
784 }
785 bbil_messages_queue_.pop_front();
786 }
787 }
788 }
789 bbil_messages_mutex_->unlock();
790
791 return enqueue;
792}
793
794} // end namespace fawkes
BlackBoard interface listener.
Interface * bbil_reader_interface(const char *iuid) noexcept
Get interface instance for given UID.
virtual void bb_interface_writer_added(Interface *interface, Uuid instance_serial) noexcept
A writing instance has been opened for a watched interface.
@ MESSAGES
Message received event entry.
@ DATA
Data changed event entry.
virtual bool bb_interface_message_received(Interface *interface, Message *message) noexcept
BlackBoard message received notification.
virtual void bb_interface_data_refreshed(Interface *interface) noexcept
BlackBoard data refreshed notification.
Interface * bbil_message_interface(const char *iuid) noexcept
Get interface instance for given UID.
virtual void bb_interface_data_changed(Interface *interface) noexcept
BlackBoard data changed notification.
virtual void bb_interface_writer_removed(Interface *interface, Uuid instance_serial) noexcept
A writing instance has been closed for a watched interface.
virtual void bb_interface_reader_added(Interface *interface, Uuid instance_serial) noexcept
A reading instance has been opened for a watched interface.
const char * bbil_name() const
Get BBIL name.
Interface * bbil_writer_interface(const char *iuid) noexcept
Get interface instance for given UID.
std::list< QueueEntry > InterfaceQueue
Queue of additions/removal of interfaces.
virtual void bb_interface_reader_removed(Interface *interface, Uuid instance_serial) noexcept
A reading instance has been closed for a watched interface.
Interface * bbil_data_interface(const char *iuid) noexcept
Get interface instance for given UID.
BlackBoard interface observer.
ObservedInterfaceLockMap::iterator ObservedInterfaceLockMapIterator
Type for iterator of lockable interface type hash sets.
ObservedInterfaceLockMap * bbio_get_observed_destroy() noexcept
Get interface destriction type watch list.
ObservedInterfaceLockMap * bbio_get_observed_create() noexcept
Get interface creation type watch list.
virtual void bb_interface_created(const char *type, const char *id) noexcept
BlackBoard interface created notification.
virtual void bb_interface_destroyed(const char *type, const char *id) noexcept
BlackBoard interface destroyed notification.
BlackBoardNotifier()
Constructor.
Definition: notifier.cpp:52
void notify_of_writer_added(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that writer has been added.
Definition: notifier.cpp:495
void notify_of_writer_removed(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that writer has been removed.
Definition: notifier.cpp:532
void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: notifier.cpp:198
void notify_of_interface_destroyed(const char *type, const char *id) noexcept
Notify that an interface has been destroyed.
Definition: notifier.cpp:436
void notify_of_reader_added(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that reader has been added.
Definition: notifier.cpp:589
virtual ~BlackBoardNotifier()
Destructor.
Definition: notifier.cpp:71
void notify_of_reader_removed(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that reader has been removed.
Definition: notifier.cpp:626
void notify_of_data_refresh(const Interface *interface, bool has_changed)
Notify of data change.
Definition: notifier.cpp:689
void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: notifier.cpp:373
void register_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Register BB event listener.
Definition: notifier.cpp:87
void notify_of_interface_created(const char *type, const char *id) noexcept
Notify that an interface has been created.
Definition: notifier.cpp:401
void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: notifier.cpp:313
void update_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Update BB event listener.
Definition: notifier.cpp:99
bool notify_of_message_received(const Interface *interface, Message *message)
Notify of message received Notify all subscribers of the given interface of an incoming message This ...
Definition: notifier.cpp:744
ListenerRegisterFlag
Flags to constrain listener registration/updates.
Definition: blackboard.h:87
@ BBIL_FLAG_READER
consider reader events
Definition: blackboard.h:90
@ BBIL_FLAG_DATA
consider data events
Definition: blackboard.h:88
@ BBIL_FLAG_WRITER
consider writer events
Definition: blackboard.h:91
@ BBIL_FLAG_MESSAGES
consider message received events
Definition: blackboard.h:89
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:686
static void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: liblogger.cpp:156
void lock() const
Lock list.
Definition: lock_map.h:91
void unlock() const
Unlock list.
Definition: lock_map.h:109
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Definition: message.h:44
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
A convenience class for universally unique identifiers (UUIDs).
Definition: uuid.h:29
Fawkes library namespace.
Structure to hold maps for active subscriptions.
InterfaceMap writer
Writer event subscriptions.
InterfaceMap messages
Message received event subscriptions.
InterfaceMap data
Data event subscriptions.
InterfaceMap reader
Reader event subscriptions.