Fawkes API Fawkes Development Version
remote.cpp
1
2/***************************************************************************
3 * remote.h - Remote BlackBoard access via Fawkes network protocol
4 *
5 * Created: Mon Mar 03 10:53:00 2008
6 * Copyright 2006-2015 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version. A runtime exception applies to
13 * this software (see LICENSE.GPL_WRE file mentioned below for details).
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21 */
22
23#include <arpa/inet.h>
24#include <blackboard/exceptions.h>
25#include <blackboard/internal/instance_factory.h>
26#include <blackboard/internal/notifier.h>
27#include <blackboard/net/ilist_content.h>
28#include <blackboard/net/interface_proxy.h>
29#include <blackboard/net/messages.h>
30#include <blackboard/remote.h>
31#include <core/threading/mutex.h>
32#include <core/threading/mutex_locker.h>
33#include <core/threading/thread.h>
34#include <core/threading/wait_condition.h>
35#include <interface/interface_info.h>
36#include <netcomm/fawkes/client.h>
37#include <utils/time/time.h>
38
39#include <cstring>
40#include <fnmatch.h>
41#include <string>
42
43namespace fawkes {
44
45/** @class RemoteBlackBoard <blackboard/remote.h>
46 * Remote BlackBoard.
47 * This class implements the access to a remote BlackBoard using the Fawkes
48 * network protocol.
49 *
50 * @author Tim Niemueller
51 */
52
53/** Constructor.
54 * @param client Fawkes network client to use.
55 */
57{
58 fnc_ = client;
59 fnc_owner_ = false;
60
61 if (!fnc_->connected()) {
62 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
63 }
64
65 fnc_->register_handler(this, FAWKES_CID_BLACKBOARD);
66
67 mutex_ = new Mutex();
68 instance_factory_ = new BlackBoardInstanceFactory();
69
70 wait_mutex_ = new Mutex();
71 wait_cond_ = new WaitCondition(wait_mutex_);
72
73 inbound_thread_ = NULL;
74 m_ = NULL;
75}
76
77/** Constructor.
78 * This will internall create a fawkes network client that is used to communicate
79 * with the remote BlackBoard.
80 * @param hostname hostname to connect to
81 * @param port port to connect to
82 */
83RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port)
84{
85 fnc_ = new FawkesNetworkClient(hostname, port);
86 try {
87 fnc_->connect();
88 } catch (Exception &e) {
89 delete fnc_;
90 throw;
91 }
92
93 fnc_owner_ = true;
94
95 if (!fnc_->connected()) {
96 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
97 }
98
99 fnc_->register_handler(this, FAWKES_CID_BLACKBOARD);
100
101 mutex_ = new Mutex();
102 instance_factory_ = new BlackBoardInstanceFactory();
103
104 wait_mutex_ = new Mutex();
105 wait_cond_ = new WaitCondition(wait_mutex_);
106
107 inbound_thread_ = NULL;
108 m_ = NULL;
109}
110
111/** Destructor. */
113{
114 fnc_->deregister_handler(FAWKES_CID_BLACKBOARD);
115 delete mutex_;
116 delete instance_factory_;
117
118 for (pit_ = proxies_.begin(); pit_ != proxies_.end(); ++pit_) {
119 delete pit_->second;
120 }
121
122 if (fnc_owner_) {
123 fnc_->disconnect();
124 delete fnc_;
125 }
126
127 delete wait_cond_;
128 delete wait_mutex_;
129}
130
131bool
133{
134 return fnc_->connected();
135}
136
137void
138RemoteBlackBoard::reopen_interfaces()
139{
140 proxies_.lock();
141 ipit_ = invalid_proxies_.begin();
142 while (ipit_ != invalid_proxies_.end()) {
143 try {
144 Interface *iface = (*ipit_)->interface();
145 open_interface(iface->type(), iface->id(), iface->owner(), iface->is_writer(), iface);
146 iface->set_validity(true);
147 ipit_ = invalid_proxies_.erase(ipit_);
148 } catch (Exception &e) {
149 // we failed to re-establish validity for the given interface, bad luck
150 ++ipit_;
151 }
152 }
153 proxies_.unlock();
154}
155
156bool
158{
159 bool rv = true;
160 try {
161 if (!fnc_->connected()) {
162 fnc_->connect();
163
164 reopen_interfaces();
165 }
166 } catch (...) {
167 rv = false;
168 }
169 return rv;
170}
171
172void
173RemoteBlackBoard::open_interface(const char *type,
174 const char *identifier,
175 const char *owner,
176 bool writer,
177 Interface * iface)
178{
179 if (!fnc_->connected()) {
180 throw Exception("Cannot instantiate remote interface, connection is dead");
181 }
182
183 mutex_->lock();
184 if (inbound_thread_ != NULL && Thread::current_thread()
185 && strcmp(Thread::current_thread()->name(), inbound_thread_) == 0) {
186 throw Exception("Cannot call open_interface() from inbound handler");
187 }
188 mutex_->unlock();
189
190 bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t));
191 strncpy(om->type, type, INTERFACE_TYPE_SIZE_ - 1);
192 strncpy(om->id, identifier, INTERFACE_ID_SIZE_ - 1);
193 memcpy(om->hash, iface->hash(), INTERFACE_HASH_SIZE_);
194
195 FawkesNetworkMessage *omsg =
196 new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
197 writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
198 om,
199 sizeof(bb_iopen_msg_t));
200
201 wait_mutex_->lock();
202 fnc_->enqueue(omsg);
203 while (
204 is_alive()
205 && (!m_ || ((m_->msgid() != MSG_BB_OPEN_SUCCESS) && (m_->msgid() != MSG_BB_OPEN_FAILURE)))) {
206 if (m_) {
207 m_->unref();
208 m_ = NULL;
209 }
210 wait_cond_->wait();
211 }
212 wait_mutex_->unlock();
213
214 if (!is_alive()) {
215 throw Exception("Connection died while trying to open %s::%s", type, identifier);
216 }
217
218 if (m_->msgid() == MSG_BB_OPEN_SUCCESS) {
219 // We got the interface, create internal storage and prepare instance for return
220 BlackBoardInterfaceProxy *proxy =
221 new BlackBoardInterfaceProxy(fnc_, m_, notifier_, iface, writer);
222 proxies_[proxy->serial()] = proxy;
223 } else if (m_->msgid() == MSG_BB_OPEN_FAILURE) {
224 bb_iopenfail_msg_t *fm = m_->msg<bb_iopenfail_msg_t>();
225 unsigned int error = ntohl(fm->error_code);
226 m_->unref();
227 m_ = NULL;
228 if (error == BB_ERR_WRITER_EXISTS) {
229 throw BlackBoardWriterActiveException(identifier, type);
230 } else if (error == BB_ERR_HASH_MISMATCH) {
231 throw Exception("Hash mismatch for interface %s:%s", type, identifier);
232 } else if (error == BB_ERR_UNKNOWN_TYPE) {
233 throw Exception("Type %s unknown (%s::%s)", type, type, identifier);
234 } else if (error == BB_ERR_WRITER_EXISTS) {
235 throw BlackBoardWriterActiveException(identifier, type);
236 } else {
237 throw Exception("Could not open interface");
238 }
239 }
240
241 m_->unref();
242 m_ = NULL;
243}
244
245Interface *
246RemoteBlackBoard::open_interface(const char *type,
247 const char *identifier,
248 const char *owner,
249 bool writer)
250{
251 if (!fnc_->connected()) {
252 throw Exception("Cannot instantiate remote interface, connection is dead");
253 }
254
255 Interface *iface = instance_factory_->new_interface_instance(type, identifier);
256 try {
257 open_interface(type, identifier, owner, writer, iface);
258 } catch (Exception &e) {
259 instance_factory_->delete_interface_instance(iface);
260 throw;
261 }
262
263 return iface;
264}
265
266Interface *
267RemoteBlackBoard::open_for_reading(const char *type, const char *identifier, const char *owner)
268{
269 return open_interface(type, identifier, owner, /* writer? */ false);
270}
271
272Interface *
273RemoteBlackBoard::open_for_writing(const char *type, const char *identifier, const char *owner)
274{
275 return open_interface(type, identifier, owner, /* writer? */ true);
276}
277
278std::list<Interface *>
280 const char *id_pattern,
281 const char *owner)
282{
283 std::list<Interface *> rv;
284
285 InterfaceInfoList *infl = list_all();
286 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
287 // ensure 0-termination
288 char type[INTERFACE_TYPE_SIZE_ + 1];
289 char id[INTERFACE_ID_SIZE_ + 1];
290 type[INTERFACE_TYPE_SIZE_] = 0;
291 id[INTERFACE_TYPE_SIZE_] = 0;
292 strncpy(type, i->type(), INTERFACE_TYPE_SIZE_);
293 strncpy(id, i->id(), INTERFACE_ID_SIZE_);
294
295 if ((fnmatch(type_pattern, type, 0) == FNM_NOMATCH)
296 || (fnmatch(id_pattern, id, 0) == FNM_NOMATCH)) {
297 // type or ID prefix does not match, go on
298 continue;
299 }
300
301 try {
302 Interface *iface = open_for_reading((*i).type(), (*i).id(), owner);
303 rv.push_back(iface);
304 } catch (Exception &e) {
305 for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
306 close(*j);
307 }
308 throw;
309 }
310 }
311
312 return rv;
313}
314
315/** Close interface.
316 * @param interface interface to close
317 */
318void
320{
321 if (interface == NULL)
322 return;
323
324 Uuid serial = interface->serial();
325
326 if (proxies_.find(serial) != proxies_.end()) {
327 delete proxies_[serial];
328 proxies_.erase(serial);
329 }
330
331 if (fnc_->connected()) {
332 // We cannot "officially" close it, if we are disconnected it cannot be used anyway
333 bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t));
334 sm->serial = interface->serial();
335
337 new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, MSG_BB_CLOSE, sm, sizeof(bb_iserial_msg_t));
338 fnc_->enqueue(omsg);
339 }
340
341 instance_factory_->delete_interface_instance(interface);
342}
343
346{
347 mutex_->lock();
348 if (inbound_thread_ != NULL && strcmp(Thread::current_thread()->name(), inbound_thread_) == 0) {
349 throw Exception("Cannot call list_all() from inbound handler");
350 }
351 mutex_->unlock();
352
354
355 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, MSG_BB_LIST_ALL);
356 wait_mutex_->lock();
357 fnc_->enqueue(omsg);
358 while (!m_ || (m_->msgid() != MSG_BB_INTERFACE_LIST)) {
359 if (m_) {
360 m_->unref();
361 m_ = NULL;
362 }
363 wait_cond_->wait();
364 }
365 wait_mutex_->unlock();
366
368 while (bbilc->has_next()) {
369 size_t iisize;
370 bb_iinfo_msg_t *ii = bbilc->next(&iisize);
371 bool has_writer = ii->writer_readers & htonl(0x80000000);
372 unsigned int num_readers = ntohl(ii->writer_readers & htonl(0x7FFFFFFF));
373 infl->append(ii->type,
374 ii->id,
375 ii->hash,
376 ntohl(ii->serial),
377 has_writer,
378 num_readers,
379 std::list<std::string>(),
380 std::string(),
382 }
383
384 m_->unref();
385 m_ = NULL;
386
387 return infl;
388}
389
391RemoteBlackBoard::list(const char *type_pattern, const char *id_pattern)
392{
393 mutex_->lock();
394 if (inbound_thread_ != NULL && strcmp(Thread::current_thread()->name(), inbound_thread_) == 0) {
395 throw Exception("Cannot call list() from inbound handler");
396 }
397 mutex_->unlock();
398
400
401 bb_ilistreq_msg_t *om = (bb_ilistreq_msg_t *)calloc(1, sizeof(bb_ilistreq_msg_t));
402 strncpy(om->type_pattern, type_pattern, INTERFACE_TYPE_SIZE_ - 1);
403 strncpy(om->id_pattern, id_pattern, INTERFACE_ID_SIZE_ - 1);
404
406 new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, MSG_BB_LIST, om, sizeof(bb_ilistreq_msg_t));
407
408 wait_mutex_->lock();
409 fnc_->enqueue(omsg);
410 while (!m_ || (m_->msgid() != MSG_BB_INTERFACE_LIST)) {
411 if (m_) {
412 m_->unref();
413 m_ = NULL;
414 }
415 wait_cond_->wait();
416 }
417 wait_mutex_->unlock();
418
420 while (bbilc->has_next()) {
421 size_t iisize;
422 bb_iinfo_msg_t *ii = bbilc->next(&iisize);
423 bool has_writer = ii->writer_readers & htonl(0x80000000);
424 unsigned int num_readers = ntohl(ii->writer_readers & htonl(0x7FFFFFFF));
425 infl->append(ii->type,
426 ii->id,
427 ii->hash,
428 ntohl(ii->serial),
429 has_writer,
430 num_readers,
431 std::list<std::string>(),
432 std::string(),
434 }
435
436 m_->unref();
437 m_ = NULL;
438
439 return infl;
440}
441
442/** We are no longer registered in Fawkes network client.
443 * Ignored.
444 * @param id the id of the calling client
445 */
446void
447RemoteBlackBoard::deregistered(unsigned int id) noexcept
448{
449}
450
451void
453{
454 mutex_->lock();
455 inbound_thread_ = Thread::current_thread()->name();
456 mutex_->unlock();
457
458 if (m->cid() == FAWKES_CID_BLACKBOARD) {
459 unsigned int msgid = m->msgid();
460 try {
461 if (msgid == MSG_BB_DATA_CHANGED || msgid == MSG_BB_DATA_REFRESHED) {
462 Uuid serial = ((Uuid *)m->payload())[0];
463 if (proxies_.find(serial) != proxies_.end()) {
464 proxies_[serial]->process_data_refreshed(m);
465 }
466 } else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
467 Uuid serial = ((Uuid *)m->payload())[0];
468 if (proxies_.find(serial) != proxies_.end()) {
469 proxies_[serial]->process_interface_message(m);
470 }
471 } else if (msgid == MSG_BB_READER_ADDED) {
473 if (proxies_.find(esm->serial) != proxies_.end()) {
474 proxies_[esm->serial]->reader_added(esm->event_serial);
475 }
476 } else if (msgid == MSG_BB_READER_REMOVED) {
478 if (proxies_.find(esm->serial) != proxies_.end()) {
479 proxies_[esm->serial]->reader_removed(esm->event_serial);
480 }
481 } else if (msgid == MSG_BB_WRITER_ADDED) {
483 if (proxies_.find(esm->serial) != proxies_.end()) {
484 proxies_[esm->serial]->writer_added(esm->event_serial);
485 }
486 } else if (msgid == MSG_BB_WRITER_REMOVED) {
488 if (proxies_.find(esm->serial) != proxies_.end()) {
489 proxies_[esm->serial]->writer_removed(esm->event_serial);
490 }
491 } else if (msgid == MSG_BB_INTERFACE_CREATED) {
492 bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
493 notifier_->notify_of_interface_created(em->type, em->id);
494 } else if (msgid == MSG_BB_INTERFACE_DESTROYED) {
495 bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
496 notifier_->notify_of_interface_destroyed(em->type, em->id);
497 } else {
498 wait_mutex_->lock();
499 m_ = m;
500 m_->ref();
501 wait_cond_->wake_all();
502 wait_mutex_->unlock();
503 }
504 } catch (Exception &e) {
505 // Bam, you're dead. Ok, not now, we just ignore that this shit happened...
506 }
507 }
508
509 mutex_->lock();
510 inbound_thread_ = NULL;
511 mutex_->unlock();
512}
513
514void
515RemoteBlackBoard::connection_died(unsigned int id) noexcept
516{
517 // mark all assigned interfaces as invalid
518 proxies_.lock();
519 for (pit_ = proxies_.begin(); pit_ != proxies_.end(); ++pit_) {
520 pit_->second->interface()->set_validity(false);
521 invalid_proxies_.push_back(pit_->second);
522 }
523 proxies_.clear();
524 proxies_.unlock();
525 wait_cond_->wake_all();
526}
527
528void
530{
531}
532
533} // end namespace fawkes
BlackBoard instance factory.
void delete_interface_instance(Interface *interface)
Destroy an interface instance.
Interface * new_interface_instance(const char *type, const char *identifier)
Creates a new interface instance.
BlackBoard interface list content.
Definition: ilist_content.h:36
bb_iinfo_msg_t * next(size_t *size)
Get next plugin from list.
bool has_next()
Check if more list elements are available.
BlackBoardNotifier * notifier_
Notifier for BB events.
Definition: blackboard.h:111
Base class for exceptions in Fawkes.
Definition: exception.h:36
Simple Fawkes network client.
Definition: client.h:52
void register_handler(FawkesNetworkClientHandler *handler, unsigned int component_id)
Register handler.
Definition: client.cpp:658
void connect()
Connect to remote.
Definition: client.cpp:424
void disconnect()
Disconnect socket.
Definition: client.cpp:539
void deregister_handler(unsigned int component_id)
Deregister handler.
Definition: client.cpp:676
bool connected() const noexcept
Check if connection is alive.
Definition: client.cpp:828
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send.
Definition: client.cpp:596
Representation of a message that is sent over the network.
Definition: message.h:77
unsigned short int msgid() const
Get message type ID.
Definition: message.cpp:294
MT * msg() const
Get correctly casted payload.
Definition: message.h:120
MT * msgc() const
Get correctly parsed output.
Definition: message.h:159
Interface information list.
void append(const char *type, const char *id, const unsigned char *hash, unsigned int serial, bool has_writer, unsigned int num_readers, const std::list< std::string > &readers, const std::string &writer, const Time &timestamp)
Append an interface info.
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
void set_validity(bool valid)
Mark this interface invalid.
Definition: interface.cpp:458
const char * type() const
Get type of interface.
Definition: interface.cpp:652
bool is_writer() const
Check if this is a writing instance.
Definition: interface.cpp:445
const unsigned char * hash() const
Get interface hash.
Definition: interface.cpp:305
const char * id() const
Get identifier of interface.
Definition: interface.cpp:661
Uuid serial() const
Get instance serial of interface.
Definition: interface.cpp:695
const char * owner() const
Get owner of interface.
Definition: interface.cpp:672
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
virtual bool try_aliveness_restore() noexcept
Try to restore the aliveness of the BlackBoard instance.
Definition: remote.cpp:157
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)
Open interface for reading.
Definition: remote.cpp:267
virtual ~RemoteBlackBoard()
Destructor.
Definition: remote.cpp:112
virtual InterfaceInfoList * list_all()
Get list of all currently existing interfaces.
Definition: remote.cpp:345
virtual InterfaceInfoList * list(const char *type_pattern, const char *id_pattern)
Get list of interfaces matching type and ID patterns.
Definition: remote.cpp:391
std::list< Interface * > open_multiple_for_reading(const char *interface_type, const char *id_pattern="*", const char *owner=NULL)
Open multiple interfaces for reading.
Definition: remote.cpp:279
virtual void connection_died(unsigned int id) noexcept
Client connection died.
Definition: remote.cpp:515
virtual void deregistered(unsigned int id) noexcept
We are no longer registered in Fawkes network client.
Definition: remote.cpp:447
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)
Open interface for writing.
Definition: remote.cpp:273
virtual void connection_established(unsigned int id) noexcept
Client has established a connection.
Definition: remote.cpp:529
virtual bool is_alive() const noexcept
Check if the BlackBoard is still alive.
Definition: remote.cpp:132
virtual void close(Interface *interface)
Close interface.
Definition: remote.cpp:319
virtual void inbound_received(FawkesNetworkMessage *msg, unsigned int id) noexcept
Called for incoming messages.
Definition: remote.cpp:452
RemoteBlackBoard(FawkesNetworkClient *client)
Constructor.
Definition: remote.cpp:56
const char * name() const
Get name of thread.
Definition: thread.h:100
static Thread * current_thread()
Get the Thread instance of the currently running thread.
Definition: thread.cpp:1366
A class for handling time.
Definition: time.h:93
A convenience class for universally unique identifiers (UUIDs).
Definition: uuid.h:29
Wait until a given condition holds.
void wait()
Wait for the condition forever.
Fawkes library namespace.
@ BB_ERR_WRITER_EXISTS
You tried to open an interface for writing but there is already a writing instance for this interface...
Definition: messages.h:64
@ BB_ERR_HASH_MISMATCH
The hashes of the interfaces do not match.
Definition: messages.h:62
@ BB_ERR_UNKNOWN_TYPE
Requested interface type is unknown.
Definition: messages.h:61
Message for interface events.
Definition: messages.h:110
char type[INTERFACE_TYPE_SIZE_]
interface type name
Definition: messages.h:111
char id[INTERFACE_ID_SIZE_]
interface instance ID
Definition: messages.h:112
Message to identify an two interface instances.
Definition: messages.h:129
Uuid event_serial
instance serial to unique identify instance that caused the event.
Definition: messages.h:131
Uuid serial
instance serial to unique identify own instance
Definition: messages.h:130
Message for interface info.
Definition: messages.h:91
uint32_t writer_readers
combined writer reader information.
Definition: messages.h:97
char id[INTERFACE_ID_SIZE_]
interface instance ID
Definition: messages.h:93
unsigned char hash[INTERFACE_HASH_SIZE_]
interface version hash
Definition: messages.h:94
int64_t timestamp_usec
data or write timestamp, usec part
Definition: messages.h:103
int64_t timestamp_sec
data or write timestamp, sec part
Definition: messages.h:102
uint32_t serial
instance serial to uniquely identify this instance (big endian)
Definition: messages.h:95
char type[INTERFACE_TYPE_SIZE_]
interface type name
Definition: messages.h:92
Message to request constrained interface list.
Definition: messages.h:76
char type_pattern[INTERFACE_TYPE_SIZE_]
type pattern
Definition: messages.h:77
char id_pattern[INTERFACE_ID_SIZE_]
ID pattern.
Definition: messages.h:78
Message to identify an interface instance.
Definition: messages.h:120
Uuid serial
instance serial to unique identify this instance
Definition: messages.h:121