24 #include <arpa/inet.h> 25 #include <blackboard/blackboard.h> 26 #include <blackboard/exceptions.h> 27 #include <blackboard/net/handler.h> 28 #include <blackboard/net/ilist_content.h> 29 #include <blackboard/net/interface_listener.h> 30 #include <blackboard/net/interface_observer.h> 31 #include <blackboard/net/messages.h> 32 #include <interface/interface.h> 33 #include <interface/interface_info.h> 34 #include <logging/liblogger.h> 35 #include <netcomm/fawkes/component_ids.h> 36 #include <netcomm/fawkes/hub.h> 56 :
Thread(
"BlackBoardNetworkHandler",
Thread::OPMODE_WAITFORWAKEUP),
71 inbound_queue_.
clear();
73 for (lit_ = listeners_.begin(); lit_ != listeners_.end(); ++lit_) {
76 for (iit_ = interfaces_.begin(); iit_ != interfaces_.end(); ++iit_) {
77 bb_->
close(iit_->second);
85 while (!inbound_queue_.empty()) {
89 unsigned int clid = msg->
clid();
91 switch (msg->
msgid()) {
92 case MSG_BB_LIST_ALL: {
96 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
101 nhub_->
send(clid, FAWKES_CID_BLACKBOARD, MSG_BB_INTERFACE_LIST, ilist);
104 "Failed to send interface " 105 "list to %u, exception follows",
117 char type_pattern[INTERFACE_TYPE_SIZE_ + 1];
118 char id_pattern[INTERFACE_ID_SIZE_ + 1];
119 type_pattern[INTERFACE_TYPE_SIZE_] = 0;
120 id_pattern[INTERFACE_ID_SIZE_] = 0;
121 strncpy(type_pattern, lrm->
type_pattern, INTERFACE_TYPE_SIZE_);
122 strncpy(id_pattern, lrm->
id_pattern, INTERFACE_ID_SIZE_);
125 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
130 nhub_->
send(clid, FAWKES_CID_BLACKBOARD, MSG_BB_INTERFACE_LIST, ilist);
134 "interface list to %u, exception follows",
141 case MSG_BB_OPEN_FOR_READING:
142 case MSG_BB_OPEN_FOR_WRITING: {
145 char type[INTERFACE_TYPE_SIZE_ + 1];
146 char id[INTERFACE_ID_SIZE_ + 1];
147 type[INTERFACE_TYPE_SIZE_] = 0;
148 id[INTERFACE_ID_SIZE_] = 0;
149 strncpy(type, om->
type, INTERFACE_TYPE_SIZE_);
150 strncpy(
id, om->
id, INTERFACE_ID_SIZE_);
156 if (msg->
msgid() == MSG_BB_OPEN_FOR_READING) {
161 if (memcmp(iface->
hash(), om->
hash, INTERFACE_HASH_SIZE_) != 0) {
163 "Opening interface %s::%s failed, " 169 interfaces_[iface->
serial()] = iface;
170 client_interfaces_[clid].push_back(iface);
171 serial_to_clid_[iface->
serial()] = clid;
172 listeners_[iface->
serial()] =
174 send_opensuccess(clid, iface);
178 "Opening interface %s::%s failed, " 179 "interface class not found",
185 "Opening interface %s::%s failed, " 186 "writer already exists",
192 "Opening interface %s::%s failed",
207 unsigned int sm_serial = ntohl(sm->
serial);
208 if (interfaces_.find(sm_serial) != interfaces_.end()) {
210 client_interfaces_.lock();
211 if (client_interfaces_.find(clid) != client_interfaces_.end()) {
213 for (ciit_ = client_interfaces_[clid].begin(); ciit_ != client_interfaces_[clid].end();
215 if ((*ciit_)->serial() == sm_serial) {
217 serial_to_clid_.erase(sm_serial);
218 client_interfaces_[clid].erase(ciit_);
219 if (client_interfaces_[clid].empty()) {
220 client_interfaces_.erase(clid);
226 client_interfaces_.unlock();
231 "Remote %u closing interface %s",
233 interfaces_[sm_serial]->uid());
234 delete listeners_[sm_serial];
235 listeners_.erase(sm_serial);
236 bb_->
close(interfaces_[sm_serial]);
237 interfaces_.erase(sm_serial);
238 interfaces_.unlock();
241 "Client %u tried to close " 242 "interface with serial %u, but opened by other client",
248 "Client %u tried to close " 249 "interface with serial %u which has not been opened",
259 case MSG_BB_DATA_CHANGED: {
260 void * payload = msg->
payload();
262 unsigned int dm_serial = ntohl(dm->
serial);
263 if (interfaces_.find(dm_serial) != interfaces_.end()) {
264 if (ntohl(dm->
data_size) != interfaces_[dm_serial]->datasize()) {
266 "DATA_CHANGED: Data size mismatch, " 267 "expected %zu, but got %zu, ignoring.",
268 interfaces_[dm_serial]->datasize(),
271 interfaces_[dm_serial]->set_from_chunk((
char *)payload +
sizeof(
bb_idata_msg_t));
272 interfaces_[dm_serial]->write();
276 "DATA_CHANGED: Interface with " 277 "serial %u not found, ignoring.",
282 case MSG_BB_INTERFACE_MESSAGE: {
283 void * payload = msg->
payload();
285 unsigned int mm_serial = ntohl(mm->
serial);
286 if (interfaces_.find(mm_serial) != interfaces_.end()) {
287 if (!interfaces_[mm_serial]->is_writer()) {
295 "MESSAGE: Data size mismatch, " 296 "expected %zu, but got %zu, ignoring.",
302 interfaces_[mm_serial]->msgq_enqueue(ifm);
306 "MESSAGE: Could not create " 307 "interface message, ignoring.");
312 "MESSAGE: Received message " 313 "notification, but for a writing instance, ignoring.");
317 "DATA_CHANGED: Interface with " 318 "serial %u not found, ignoring.",
325 "Unknown message of type %u " 337 BlackBoardNetworkHandler::send_opensuccess(
unsigned int clid,
Interface *interface)
342 osm->writer_readers = htonl(interface->
num_readers());
344 osm->writer_readers |= htonl(0x80000000);
346 osm->writer_readers &= htonl(0x7FFFFFFF);
348 osm->data_size = htonl(interface->
datasize());
354 memcpy((
char *)payload +
sizeof(bb_iopensucc_msg_t),
358 FawkesNetworkMessage *omsg =
359 new FawkesNetworkMessage(clid,
360 FAWKES_CID_BLACKBOARD,
363 sizeof(bb_iopensucc_msg_t) + interface->
datasize());
366 }
catch (Exception &e) {
368 "Failed to send interface " 369 "open success to %u, exception follows",
376 BlackBoardNetworkHandler::send_openfailure(
unsigned int clid,
unsigned int error_code)
378 bb_iopenfail_msg_t *ofm = (bb_iopenfail_msg_t *)malloc(
sizeof(bb_iopenfail_msg_t));
379 ofm->error_code = htonl(error_code);
381 FawkesNetworkMessage *omsg =
new FawkesNetworkMessage(
382 clid, FAWKES_CID_BLACKBOARD, MSG_BB_OPEN_FAILURE, ofm,
sizeof(bb_iopenfail_msg_t));
385 }
catch (Exception &e) {
387 "Failed to send interface " 388 "open failure to %u, exception follows",
422 client_interfaces_.lock();
423 if (client_interfaces_.find(clid) != client_interfaces_.end()) {
425 for (ciit_ = client_interfaces_[clid].begin(); ciit_ != client_interfaces_[clid].end();
428 "Closing interface %s::%s of remote " 429 "%u (client disconnected)",
434 unsigned int serial = (*ciit_)->serial();
435 serial_to_clid_.erase(serial);
436 interfaces_.erase_locked(serial);
437 delete listeners_[serial];
438 listeners_.erase(serial);
441 client_interfaces_.erase(clid);
443 client_interfaces_.unlock();
void * payload() const
Get payload buffer.
void clear()
Clear the queue.
unsigned int datasize() const
Get data size.
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Requested interface type is unknown.
void unref()
Decrement reference count and conditionally delete this instance.
BlackBoardNetworkHandler(BlackBoard *blackboard, FawkesNetworkHub *hub)
Constructor.
static void log_debug(const char *component, const char *format,...)
Log debug message.
uint32_t serial
instance serial to unique identify this instance
Fawkes library namespace.
unsigned int clid() const
Get client ID.
Message to identify an interface on open.
Interface listener for network handler.
virtual void add_handler(FawkesNetworkHandler *handler)=0
Add a message handler.
virtual void loop()
Process all network messages that have been received.
Representation of a message that is sent over the network.
Thread class encapsulation of pthreads.
Base class for all Fawkes BlackBoard interfaces.
char id_pattern[INTERFACE_ID_SIZE_]
ID pattern.
Interface observer for blackboard network handler.
virtual InterfaceInfoList * list(const char *type_pattern, const char *id_pattern)=0
Get list of interfaces matching type and ID patterns.
You tried to open an interface for writing but there is already a writing instance for this interface...
char msg_type[INTERFACE_MESSAGE_TYPE_SIZE_]
message type
virtual void client_disconnected(unsigned int clid)
Client disconnected.
const unsigned char * hash() const
Get interface hash.
static void log_error(const char *component, const char *format,...)
Log error message.
virtual void client_connected(unsigned int clid)
Client connected.
Interface information list.
void wakeup()
Wake up thread.
Base class for exceptions in Fawkes.
unsigned short serial() const
Get instance serial of interface.
void read()
Read from BlackBoard into local copy.
virtual void send(FawkesNetworkMessage *msg)=0
Method to send a message to a specific client.
void set_from_chunk(const void *chunk)
Set from raw data chunk.
Interface open success The serial denotes a unique instance of an interface within the (remote) Black...
char id[INTERFACE_ID_SIZE_]
interface instance ID
void ref()
Increment reference count.
The hashes of the interfaces do not match.
bool has_writer() const
Check if there is a writer for the interface.
uint32_t data_size
data for message
Network handler abstract base class.
static void log_warn(const char *component, const char *format,...)
Log warning message.
~BlackBoardNetworkHandler()
Destructor.
bool is_writer() const
Check if this is a writing instance.
void append_interface(const char *type, const char *id, const unsigned char *hash, unsigned int serial, bool has_writer, unsigned int num_readers, const fawkes::Time &time)
Append interface info.
void pop_locked()
Pop element from queue with lock protection.
uint32_t serial
interface instance serial
unsigned short int msgid() const
Get message type ID.
uint32_t serial
instance serial to unique identify this instance
uint32_t serial
instance serial to unique identify this instance
virtual InterfaceInfoList * list_all()=0
Get list of all currently existing interfaces.
Thrown if no definition of interface or interface generator found.
void push_locked(const Type &x)
Push element to queue with lock protection.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
unsigned int datasize() const
Get size of data.
Message to request constrained interface list.
char type[INTERFACE_TYPE_SIZE_]
interface type name
BlackBoard interface list content.
Message to identify an interface instance.
const void * datachunk() const
Get data chunk.
unsigned int num_readers() const
Get the number of readers.
virtual void remove_handler(FawkesNetworkHandler *handler)=0
Remove a message handler.
The BlackBoard abstract class.
Thrown if a writer is already active on an interface that writing has been requested for.
MT * msg() const
Get correctly casted payload.
unsigned char hash[INTERFACE_HASH_SIZE_]
interface version hash
char type_pattern[INTERFACE_TYPE_SIZE_]
type pattern
void set_hops(unsigned int hops)
Set number of hops.
uint32_t data_size
size in bytes of the following data.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
void set_id(unsigned int message_id)
Set message ID.
uint32_t hops
number of hops this message already passed
virtual void handle_network_message(FawkesNetworkMessage *msg)
Handle network message.
virtual void close(Interface *interface)=0
Close interface.