24#include <blackboard/blackboard.h>
25#include <blackboard/interface_listener.h>
26#include <blackboard/interface_observer.h>
27#include <blackboard/internal/notifier.h>
28#include <core/threading/mutex.h>
29#include <core/threading/mutex_locker.h>
30#include <core/utils/lock_hashmap.h>
31#include <core/utils/lock_hashset.h>
32#include <interface/interface.h>
33#include <logging/liblogger.h>
54 bbil_writer_events_ = 0;
55 bbil_writer_mutex_ =
new Mutex();
57 bbil_reader_events_ = 0;
58 bbil_reader_mutex_ =
new Mutex();
60 bbil_data_events_ = 0;
61 bbil_data_mutex_ =
new Mutex();
63 bbil_messages_events_ = 0;
64 bbil_messages_mutex_ =
new Mutex();
67 bbio_mutex_ =
new Mutex();
73 delete bbil_writer_mutex_;
74 delete bbil_reader_mutex_;
75 delete bbil_data_mutex_;
76 delete bbil_messages_mutex_;
104 BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
106 for (i = queue.begin(); i != queue.end(); ++i) {
110 proc_listener_maybe_queue(i->op,
122 proc_listener_maybe_queue(i->op,
125 bbil_messages_mutex_,
126 bbil_messages_events_,
128 bbil_messages_queue_,
134 proc_listener_maybe_queue(i->op,
146 proc_listener_maybe_queue(i->op,
160 listener->bbil_release_queue(flag);
164BlackBoardNotifier::proc_listener_maybe_queue(
bool op,
168 unsigned int & events,
177 "listener %s for %s events (queued)",
178 op ?
"Registering" :
"Unregistering",
182 queue_listener(op, interface, listener, queue);
185 add_listener(interface, listener, map);
187 remove_listener(interface, listener, map);
202 BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
203 for (i = maps.
data.begin(); i != maps.
data.end(); ++i) {
204 proc_listener_maybe_queue(
false,
215 proc_listener_maybe_queue(
false,
218 bbil_messages_mutex_,
219 bbil_messages_events_,
221 bbil_messages_queue_,
225 for (i = maps.
reader.begin(); i != maps.
reader.end(); ++i) {
226 proc_listener_maybe_queue(
false,
236 for (i = maps.
writer.begin(); i != maps.
writer.end(); ++i) {
237 proc_listener_maybe_queue(
false,
247 listener->bbil_release_maps();
256BlackBoardNotifier::add_listener(
Interface * interface,
260 std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->
uid());
262 BBilMap::value_type v = std::make_pair(interface->
uid(), listener);
263 BBilMap::iterator f = std::find(ret.first, ret.second, v);
265 if (f == ret.second) {
266 ilmap.insert(std::make_pair(interface->
uid(), listener));
271BlackBoardNotifier::remove_listener(Interface * interface,
272 BlackBoardInterfaceListener *listener,
275 std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
276 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
277 if (j->second == listener) {
285BlackBoardNotifier::is_in_queue(
bool op,
288 BlackBoardInterfaceListener *bbil)
290 BBilQueue::iterator q;
291 for (q = queue.begin(); q != queue.end(); ++q) {
292 if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
300BlackBoardNotifier::queue_listener(
bool op,
301 Interface * interface,
302 BlackBoardInterfaceListener *listener,
305 BBilQueueEntry qe = {op, interface->uid(), interface, listener};
316 if (bbio_events_ > 0) {
317 bbio_queue_.push_back(std::make_pair(1, observer));
332 for (i = its->begin(); i != its->end(); ++i) {
333 bbiomap[i->first].push_back(make_pair(observer, i->second));
343BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
345 BBioMapIterator i, tmp;
348 while (i != iomap.end()) {
349 BBioListIterator j = i->second.begin();
350 while (j != i->second.end()) {
351 if (j->first == observer) {
352 j = i->second.erase(j);
357 if (i->second.empty()) {
376 if (bbio_events_ > 0) {
377 BBioQueueEntry e = std::make_pair((
unsigned int)0, observer);
378 BBioQueue::iterator re;
379 while ((re = find_if(bbio_queue_.begin(),
381 bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
382 != bbio_queue_.end()) {
384 if (re->second == observer) {
385 bbio_queue_.erase(re);
388 bbio_queue_.push_back(std::make_pair(0, observer));
391 remove_observer(bbio_created_, observer);
392 remove_observer(bbio_destroyed_, observer);
405 bbio_mutex_->unlock();
407 BBioMapIterator lhmi;
408 BBioListIterator i, l;
409 for (lhmi = bbio_created_.begin(); lhmi != bbio_created_.end(); ++lhmi) {
410 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
413 BBioList &list = lhmi->second;
414 for (i = list.begin(); i != list.end(); ++i) {
416 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
417 if (fnmatch(pi->c_str(),
id, 0) == 0) {
427 process_bbio_queue();
428 bbio_mutex_->unlock();
440 bbio_mutex_->unlock();
442 BBioMapIterator lhmi;
443 BBioListIterator i, l;
444 for (lhmi = bbio_destroyed_.begin(); lhmi != bbio_destroyed_.end(); ++lhmi) {
445 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
448 BBioList &list = (*lhmi).second;
449 for (i = list.begin(); i != list.end(); ++i) {
451 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
452 if (fnmatch(pi->c_str(),
id, 0) == 0) {
462 process_bbio_queue();
463 bbio_mutex_->unlock();
467BlackBoardNotifier::process_bbio_queue()
469 if (!bbio_queue_.empty()) {
470 if (bbio_events_ > 0) {
473 while (!bbio_queue_.empty()) {
474 BBioQueueEntry &e = bbio_queue_.front();
476 add_observer(e.second, e.second->bbio_get_observed_create(), bbio_created_);
477 add_observer(e.second, e.second->bbio_get_observed_destroy(), bbio_destroyed_);
479 remove_observer(bbio_created_, e.second);
480 remove_observer(bbio_destroyed_, e.second);
482 bbio_queue_.pop_front();
496 Uuid event_instance_serial)
noexcept
498 bbil_writer_mutex_->lock();
499 bbil_writer_events_ += 1;
500 bbil_writer_mutex_->unlock();
502 const char * uid = interface->uid();
503 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
504 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
506 if (!is_in_queue(
false, bbil_writer_queue_, uid, bbil)) {
508 if (bbil_iface != NULL) {
512 "BBIL[%s] registered for writer events "
513 "(open) for '%s' but has no such interface",
520 bbil_writer_mutex_->lock();
521 bbil_writer_events_ -= 1;
522 process_writer_queue();
523 bbil_writer_mutex_->unlock();
533 Uuid event_instance_serial)
noexcept
535 bbil_writer_mutex_->lock();
536 bbil_writer_events_ += 1;
537 bbil_writer_mutex_->unlock();
539 const char * uid = interface->uid();
540 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
541 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
543 if (!is_in_queue(
false, bbil_data_queue_, uid, bbil)) {
545 if (bbil_iface != NULL) {
549 "BBIL[%s] registered for writer events "
550 "(close) for '%s' but has no such interface",
557 bbil_writer_mutex_->lock();
558 bbil_writer_events_ -= 1;
559 process_writer_queue();
560 bbil_writer_mutex_->unlock();
564BlackBoardNotifier::process_writer_queue()
566 if (!bbil_writer_queue_.empty()) {
567 if (bbil_writer_events_ > 0) {
570 while (!bbil_writer_queue_.empty()) {
571 BBilQueueEntry &e = bbil_writer_queue_.front();
573 add_listener(e.interface, e.listener, bbil_writer_);
575 remove_listener(e.interface, e.listener, bbil_writer_);
577 bbil_writer_queue_.pop_front();
590 Uuid event_instance_serial)
noexcept
592 bbil_reader_mutex_->lock();
593 bbil_reader_events_ += 1;
594 bbil_reader_mutex_->unlock();
596 const char * uid = interface->uid();
597 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
598 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
600 if (!is_in_queue(
false, bbil_reader_queue_, uid, bbil)) {
602 if (bbil_iface != NULL) {
606 "BBIL[%s] registered for reader events "
607 "(open) for '%s' but has no such interface",
614 bbil_reader_mutex_->lock();
615 bbil_reader_events_ -= 1;
616 process_reader_queue();
617 bbil_reader_mutex_->unlock();
627 Uuid event_instance_serial)
noexcept
629 bbil_reader_mutex_->lock();
630 bbil_reader_events_ += 1;
631 bbil_reader_mutex_->unlock();
633 const char * uid = interface->uid();
634 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
635 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
637 if (!is_in_queue(
false, bbil_data_queue_, uid, bbil)) {
639 if (bbil_iface != NULL) {
643 "BBIL[%s] registered for reader events "
644 "(close) for '%s' but has no such interface",
651 bbil_reader_mutex_->lock();
652 bbil_reader_events_ -= 1;
653 process_reader_queue();
654 bbil_reader_mutex_->unlock();
658BlackBoardNotifier::process_reader_queue()
660 if (!bbil_reader_queue_.empty()) {
661 if (bbil_reader_events_ > 0) {
664 while (!bbil_reader_queue_.empty()) {
665 BBilQueueEntry &e = bbil_reader_queue_.front();
667 add_listener(e.interface, e.listener, bbil_reader_);
669 remove_listener(e.interface, e.listener, bbil_reader_);
671 bbil_reader_queue_.pop_front();
691 bbil_data_mutex_->
lock();
692 bbil_data_events_ += 1;
693 bbil_data_mutex_->
unlock();
695 const char * uid = interface->
uid();
696 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_data_.equal_range(uid);
697 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
699 if (!is_in_queue(
false, bbil_data_queue_, uid, bbil)) {
701 if (bbil_iface != NULL) {
707 "BBIL[%s] registered for data change events "
708 "for '%s' but has no such interface",
715 bbil_data_mutex_->
lock();
716 bbil_data_events_ -= 1;
717 if (!bbil_data_queue_.empty()) {
718 if (bbil_data_events_ == 0) {
719 while (!bbil_data_queue_.empty()) {
720 BBilQueueEntry &e = bbil_data_queue_.front();
722 add_listener(e.interface, e.listener, bbil_data_);
724 remove_listener(e.interface, e.listener, bbil_data_);
726 bbil_data_queue_.pop_front();
730 bbil_data_mutex_->
unlock();
746 bbil_messages_mutex_->
lock();
747 bbil_messages_events_ += 1;
748 bbil_messages_mutex_->
unlock();
752 const char * uid = interface->
uid();
753 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_messages_.equal_range(uid);
754 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
756 if (!is_in_queue(
false, bbil_messages_queue_, uid, bbil)) {
758 if (bbil_iface != NULL) {
766 "BBIL[%s] registered for message events "
767 "for '%s' but has no such interface",
774 bbil_messages_mutex_->
lock();
775 bbil_messages_events_ -= 1;
776 if (!bbil_messages_queue_.empty()) {
777 if (bbil_messages_events_ == 0) {
778 while (!bbil_messages_queue_.empty()) {
779 BBilQueueEntry &e = bbil_messages_queue_.front();
781 add_listener(e.interface, e.listener, bbil_messages_);
783 remove_listener(e.interface, e.listener, bbil_messages_);
785 bbil_messages_queue_.pop_front();
789 bbil_messages_mutex_->
unlock();
BlackBoard interface listener.
Interface * bbil_reader_interface(const char *iuid) noexcept
Get interface instance for given UID.
virtual void bb_interface_writer_added(Interface *interface, Uuid instance_serial) noexcept
A writing instance has been opened for a watched interface.
@ WRITER
Writer event entry.
@ READER
Reader event entry.
@ MESSAGES
Message received event entry.
@ DATA
Data changed event entry.
virtual bool bb_interface_message_received(Interface *interface, Message *message) noexcept
BlackBoard message received notification.
virtual void bb_interface_data_refreshed(Interface *interface) noexcept
BlackBoard data refreshed notification.
Interface * bbil_message_interface(const char *iuid) noexcept
Get interface instance for given UID.
virtual void bb_interface_data_changed(Interface *interface) noexcept
BlackBoard data changed notification.
virtual void bb_interface_writer_removed(Interface *interface, Uuid instance_serial) noexcept
A writing instance has been closed for a watched interface.
virtual void bb_interface_reader_added(Interface *interface, Uuid instance_serial) noexcept
A reading instance has been opened for a watched interface.
const char * bbil_name() const
Get BBIL name.
Interface * bbil_writer_interface(const char *iuid) noexcept
Get interface instance for given UID.
std::list< QueueEntry > InterfaceQueue
Queue of additions/removal of interfaces.
virtual void bb_interface_reader_removed(Interface *interface, Uuid instance_serial) noexcept
A reading instance has been closed for a watched interface.
Interface * bbil_data_interface(const char *iuid) noexcept
Get interface instance for given UID.
BlackBoard interface observer.
ObservedInterfaceLockMap::iterator ObservedInterfaceLockMapIterator
Type for iterator of lockable interface type hash sets.
ObservedInterfaceLockMap * bbio_get_observed_destroy() noexcept
Get interface destriction type watch list.
ObservedInterfaceLockMap * bbio_get_observed_create() noexcept
Get interface creation type watch list.
virtual void bb_interface_created(const char *type, const char *id) noexcept
BlackBoard interface created notification.
virtual void bb_interface_destroyed(const char *type, const char *id) noexcept
BlackBoard interface destroyed notification.
BlackBoardNotifier()
Constructor.
void notify_of_writer_added(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that writer has been added.
void notify_of_writer_removed(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that writer has been removed.
void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
void notify_of_interface_destroyed(const char *type, const char *id) noexcept
Notify that an interface has been destroyed.
void notify_of_reader_added(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that reader has been added.
virtual ~BlackBoardNotifier()
Destructor.
void notify_of_reader_removed(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that reader has been removed.
void notify_of_data_refresh(const Interface *interface, bool has_changed)
Notify of data change.
void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
void register_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Register BB event listener.
void notify_of_interface_created(const char *type, const char *id) noexcept
Notify that an interface has been created.
void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
void update_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Update BB event listener.
bool notify_of_message_received(const Interface *interface, Message *message)
Notify of message received Notify all subscribers of the given interface of an incoming message This ...
ListenerRegisterFlag
Flags to constrain listener registration/updates.
@ BBIL_FLAG_READER
consider reader events
@ BBIL_FLAG_DATA
consider data events
@ BBIL_FLAG_WRITER
consider writer events
@ BBIL_FLAG_MESSAGES
consider message received events
Base class for all Fawkes BlackBoard interfaces.
const char * uid() const
Get unique identifier of interface.
static void log_warn(const char *component, const char *format,...)
Log warning message.
void lock() const
Lock list.
void unlock() const
Unlock list.
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Mutex mutual exclusion lock.
void lock()
Lock this mutex.
void unlock()
Unlock the mutex.
A convenience class for universally unique identifiers (UUIDs).
Fawkes library namespace.
Structure to hold maps for active subscriptions.
InterfaceMap writer
Writer event subscriptions.
InterfaceMap messages
Message received event subscriptions.
InterfaceMap data
Data event subscriptions.
InterfaceMap reader
Reader event subscriptions.