Fawkes API  Fawkes Development Version
remote.cpp
1 
2 /***************************************************************************
3  * remote.h - Remote BlackBoard access via Fawkes network protocol
4  *
5  * Created: Mon Mar 03 10:53:00 2008
6  * Copyright 2006-2015 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version. A runtime exception applies to
13  * this software (see LICENSE.GPL_WRE file mentioned below for details).
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21  */
22 
23 #include <arpa/inet.h>
24 #include <blackboard/exceptions.h>
25 #include <blackboard/internal/instance_factory.h>
26 #include <blackboard/internal/notifier.h>
27 #include <blackboard/net/ilist_content.h>
28 #include <blackboard/net/interface_proxy.h>
29 #include <blackboard/net/messages.h>
30 #include <blackboard/remote.h>
31 #include <core/threading/mutex.h>
32 #include <core/threading/mutex_locker.h>
33 #include <core/threading/thread.h>
34 #include <core/threading/wait_condition.h>
35 #include <interface/interface_info.h>
36 #include <netcomm/fawkes/client.h>
37 #include <utils/time/time.h>
38 
39 #include <cstring>
40 #include <fnmatch.h>
41 #include <string>
42 
43 namespace fawkes {
44 
45 /** @class RemoteBlackBoard <blackboard/remote.h>
46  * Remote BlackBoard.
47  * This class implements the access to a remote BlackBoard using the Fawkes
48  * network protocol.
49  *
50  * @author Tim Niemueller
51  */
52 
53 /** Constructor.
54  * @param client Fawkes network client to use.
55  */
57 {
58  fnc_ = client;
59  fnc_owner_ = false;
60 
61  if (!fnc_->connected()) {
62  throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
63  }
64 
65  fnc_->register_handler(this, FAWKES_CID_BLACKBOARD);
66 
67  mutex_ = new Mutex();
68  instance_factory_ = new BlackBoardInstanceFactory();
69 
70  wait_mutex_ = new Mutex();
71  wait_cond_ = new WaitCondition(wait_mutex_);
72 
73  inbound_thread_ = NULL;
74  m_ = NULL;
75 }
76 
77 /** Constructor.
78  * This will internall create a fawkes network client that is used to communicate
79  * with the remote BlackBoard.
80  * @param hostname hostname to connect to
81  * @param port port to connect to
82  */
83 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port)
84 {
85  fnc_ = new FawkesNetworkClient(hostname, port);
86  try {
87  fnc_->connect();
88  } catch (Exception &e) {
89  delete fnc_;
90  throw;
91  }
92 
93  fnc_owner_ = true;
94 
95  if (!fnc_->connected()) {
96  throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
97  }
98 
99  fnc_->register_handler(this, FAWKES_CID_BLACKBOARD);
100 
101  mutex_ = new Mutex();
102  instance_factory_ = new BlackBoardInstanceFactory();
103 
104  wait_mutex_ = new Mutex();
105  wait_cond_ = new WaitCondition(wait_mutex_);
106 
107  inbound_thread_ = NULL;
108  m_ = NULL;
109 }
110 
111 /** Destructor. */
113 {
114  fnc_->deregister_handler(FAWKES_CID_BLACKBOARD);
115  delete mutex_;
116  delete instance_factory_;
117 
118  for (pit_ = proxies_.begin(); pit_ != proxies_.end(); ++pit_) {
119  delete pit_->second;
120  }
121 
122  if (fnc_owner_) {
123  fnc_->disconnect();
124  delete fnc_;
125  }
126 
127  delete wait_cond_;
128  delete wait_mutex_;
129 }
130 
131 bool
133 {
134  return fnc_->connected();
135 }
136 
137 void
138 RemoteBlackBoard::reopen_interfaces()
139 {
140  proxies_.lock();
141  ipit_ = invalid_proxies_.begin();
142  while (ipit_ != invalid_proxies_.end()) {
143  try {
144  Interface *iface = (*ipit_)->interface();
145  open_interface(iface->type(), iface->id(), iface->owner(), iface->is_writer(), iface);
146  iface->set_validity(true);
147  ipit_ = invalid_proxies_.erase(ipit_);
148  } catch (Exception &e) {
149  // we failed to re-establish validity for the given interface, bad luck
150  ++ipit_;
151  }
152  }
153  proxies_.unlock();
154 }
155 
156 bool
158 {
159  bool rv = true;
160  try {
161  if (!fnc_->connected()) {
162  fnc_->connect();
163 
164  reopen_interfaces();
165  }
166  } catch (...) {
167  rv = false;
168  }
169  return rv;
170 }
171 
172 void
173 RemoteBlackBoard::open_interface(const char *type,
174  const char *identifier,
175  const char *owner,
176  bool writer,
177  Interface * iface)
178 {
179  if (!fnc_->connected()) {
180  throw Exception("Cannot instantiate remote interface, connection is dead");
181  }
182 
183  mutex_->lock();
184  if (inbound_thread_ != NULL && Thread::current_thread()
185  && strcmp(Thread::current_thread()->name(), inbound_thread_) == 0) {
186  throw Exception("Cannot call open_interface() from inbound handler");
187  }
188  mutex_->unlock();
189 
190  bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t));
191  strncpy(om->type, type, INTERFACE_TYPE_SIZE_ - 1);
192  strncpy(om->id, identifier, INTERFACE_ID_SIZE_ - 1);
193  memcpy(om->hash, iface->hash(), INTERFACE_HASH_SIZE_);
194 
195  FawkesNetworkMessage *omsg =
196  new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
197  writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
198  om,
199  sizeof(bb_iopen_msg_t));
200 
201  wait_mutex_->lock();
202  fnc_->enqueue(omsg);
203  while (
204  is_alive()
205  && (!m_ || ((m_->msgid() != MSG_BB_OPEN_SUCCESS) && (m_->msgid() != MSG_BB_OPEN_FAILURE)))) {
206  if (m_) {
207  m_->unref();
208  m_ = NULL;
209  }
210  wait_cond_->wait();
211  }
212  wait_mutex_->unlock();
213 
214  if (!is_alive()) {
215  throw Exception("Connection died while trying to open %s::%s", type, identifier);
216  }
217 
218  if (m_->msgid() == MSG_BB_OPEN_SUCCESS) {
219  // We got the interface, create internal storage and prepare instance for return
220  BlackBoardInterfaceProxy *proxy =
221  new BlackBoardInterfaceProxy(fnc_, m_, notifier_, iface, writer);
222  proxies_[proxy->serial()] = proxy;
223  } else if (m_->msgid() == MSG_BB_OPEN_FAILURE) {
224  bb_iopenfail_msg_t *fm = m_->msg<bb_iopenfail_msg_t>();
225  unsigned int error = ntohl(fm->error_code);
226  m_->unref();
227  m_ = NULL;
228  if (error == BB_ERR_WRITER_EXISTS) {
229  throw BlackBoardWriterActiveException(identifier, type);
230  } else if (error == BB_ERR_HASH_MISMATCH) {
231  throw Exception("Hash mismatch for interface %s:%s", type, identifier);
232  } else if (error == BB_ERR_UNKNOWN_TYPE) {
233  throw Exception("Type %s unknown (%s::%s)", type, type, identifier);
234  } else if (error == BB_ERR_WRITER_EXISTS) {
235  throw BlackBoardWriterActiveException(identifier, type);
236  } else {
237  throw Exception("Could not open interface");
238  }
239  }
240 
241  m_->unref();
242  m_ = NULL;
243 }
244 
245 Interface *
246 RemoteBlackBoard::open_interface(const char *type,
247  const char *identifier,
248  const char *owner,
249  bool writer)
250 {
251  if (!fnc_->connected()) {
252  throw Exception("Cannot instantiate remote interface, connection is dead");
253  }
254 
255  Interface *iface = instance_factory_->new_interface_instance(type, identifier);
256  try {
257  open_interface(type, identifier, owner, writer, iface);
258  } catch (Exception &e) {
259  instance_factory_->delete_interface_instance(iface);
260  throw;
261  }
262 
263  return iface;
264 }
265 
266 Interface *
267 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier, const char *owner)
268 {
269  return open_interface(type, identifier, owner, /* writer? */ false);
270 }
271 
272 Interface *
273 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier, const char *owner)
274 {
275  return open_interface(type, identifier, owner, /* writer? */ true);
276 }
277 
278 std::list<Interface *>
279 RemoteBlackBoard::open_multiple_for_reading(const char *type_pattern,
280  const char *id_pattern,
281  const char *owner)
282 {
283  std::list<Interface *> rv;
284 
285  InterfaceInfoList *infl = list_all();
286  for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
287  // ensure 0-termination
288  char type[INTERFACE_TYPE_SIZE_ + 1];
289  char id[INTERFACE_ID_SIZE_ + 1];
290  type[INTERFACE_TYPE_SIZE_] = 0;
291  id[INTERFACE_TYPE_SIZE_] = 0;
292  strncpy(type, i->type(), INTERFACE_TYPE_SIZE_);
293  strncpy(id, i->id(), INTERFACE_ID_SIZE_);
294 
295  if ((fnmatch(type_pattern, type, 0) == FNM_NOMATCH)
296  || (fnmatch(id_pattern, id, 0) == FNM_NOMATCH)) {
297  // type or ID prefix does not match, go on
298  continue;
299  }
300 
301  try {
302  Interface *iface = open_for_reading((*i).type(), (*i).id(), owner);
303  rv.push_back(iface);
304  } catch (Exception &e) {
305  for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
306  close(*j);
307  }
308  throw;
309  }
310  }
311 
312  return rv;
313 }
314 
315 /** Close interface.
316  * @param interface interface to close
317  */
318 void
320 {
321  if (interface == NULL)
322  return;
323 
324  unsigned int serial = interface->serial();
325 
326  if (proxies_.find(serial) != proxies_.end()) {
327  delete proxies_[serial];
328  proxies_.erase(serial);
329  }
330 
331  if (fnc_->connected()) {
332  // We cannot "officially" close it, if we are disconnected it cannot be used anyway
333  bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t));
334  sm->serial = htonl(interface->serial());
335 
336  FawkesNetworkMessage *omsg =
337  new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, MSG_BB_CLOSE, sm, sizeof(bb_iserial_msg_t));
338  fnc_->enqueue(omsg);
339  }
340 
341  instance_factory_->delete_interface_instance(interface);
342 }
343 
346 {
347  mutex_->lock();
348  if (inbound_thread_ != NULL && strcmp(Thread::current_thread()->name(), inbound_thread_) == 0) {
349  throw Exception("Cannot call list_all() from inbound handler");
350  }
351  mutex_->unlock();
352 
353  InterfaceInfoList *infl = new InterfaceInfoList();
354 
355  FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, MSG_BB_LIST_ALL);
356  wait_mutex_->lock();
357  fnc_->enqueue(omsg);
358  while (!m_ || (m_->msgid() != MSG_BB_INTERFACE_LIST)) {
359  if (m_) {
360  m_->unref();
361  m_ = NULL;
362  }
363  wait_cond_->wait();
364  }
365  wait_mutex_->unlock();
366 
368  while (bbilc->has_next()) {
369  size_t iisize;
370  bb_iinfo_msg_t *ii = bbilc->next(&iisize);
371  bool has_writer = ii->writer_readers & htonl(0x80000000);
372  unsigned int num_readers = ntohl(ii->writer_readers & htonl(0x7FFFFFFF));
373  infl->append(ii->type,
374  ii->id,
375  ii->hash,
376  ntohl(ii->serial),
377  has_writer,
378  num_readers,
379  std::list<std::string>(),
380  std::string(),
382  }
383 
384  m_->unref();
385  m_ = NULL;
386 
387  return infl;
388 }
389 
391 RemoteBlackBoard::list(const char *type_pattern, const char *id_pattern)
392 {
393  mutex_->lock();
394  if (inbound_thread_ != NULL && strcmp(Thread::current_thread()->name(), inbound_thread_) == 0) {
395  throw Exception("Cannot call list() from inbound handler");
396  }
397  mutex_->unlock();
398 
399  InterfaceInfoList *infl = new InterfaceInfoList();
400 
401  bb_ilistreq_msg_t *om = (bb_ilistreq_msg_t *)calloc(1, sizeof(bb_ilistreq_msg_t));
402  strncpy(om->type_pattern, type_pattern, INTERFACE_TYPE_SIZE_ - 1);
403  strncpy(om->id_pattern, id_pattern, INTERFACE_ID_SIZE_ - 1);
404 
405  FawkesNetworkMessage *omsg =
406  new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, MSG_BB_LIST, om, sizeof(bb_ilistreq_msg_t));
407 
408  wait_mutex_->lock();
409  fnc_->enqueue(omsg);
410  while (!m_ || (m_->msgid() != MSG_BB_INTERFACE_LIST)) {
411  if (m_) {
412  m_->unref();
413  m_ = NULL;
414  }
415  wait_cond_->wait();
416  }
417  wait_mutex_->unlock();
418 
420  while (bbilc->has_next()) {
421  size_t iisize;
422  bb_iinfo_msg_t *ii = bbilc->next(&iisize);
423  bool has_writer = ii->writer_readers & htonl(0x80000000);
424  unsigned int num_readers = ntohl(ii->writer_readers & htonl(0x7FFFFFFF));
425  infl->append(ii->type,
426  ii->id,
427  ii->hash,
428  ntohl(ii->serial),
429  has_writer,
430  num_readers,
431  std::list<std::string>(),
432  std::string(),
434  }
435 
436  m_->unref();
437  m_ = NULL;
438 
439  return infl;
440 }
441 
442 /** We are no longer registered in Fawkes network client.
443  * Ignored.
444  * @param id the id of the calling client
445  */
446 void
447 RemoteBlackBoard::deregistered(unsigned int id) throw()
448 {
449 }
450 
451 void
453 {
454  mutex_->lock();
455  inbound_thread_ = Thread::current_thread()->name();
456  mutex_->unlock();
457 
458  if (m->cid() == FAWKES_CID_BLACKBOARD) {
459  unsigned int msgid = m->msgid();
460  try {
461  if (msgid == MSG_BB_DATA_CHANGED) {
462  unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
463  if (proxies_.find(serial) != proxies_.end()) {
464  proxies_[serial]->process_data_changed(m);
465  }
466  } else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
467  unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
468  if (proxies_.find(serial) != proxies_.end()) {
469  proxies_[serial]->process_interface_message(m);
470  }
471  } else if (msgid == MSG_BB_READER_ADDED) {
473  if (proxies_.find(ntohl(esm->serial)) != proxies_.end()) {
474  proxies_[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial));
475  }
476  } else if (msgid == MSG_BB_READER_REMOVED) {
478  if (proxies_.find(ntohl(esm->serial)) != proxies_.end()) {
479  proxies_[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial));
480  }
481  } else if (msgid == MSG_BB_WRITER_ADDED) {
483  if (proxies_.find(ntohl(esm->serial)) != proxies_.end()) {
484  proxies_[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial));
485  }
486  } else if (msgid == MSG_BB_WRITER_REMOVED) {
488  if (proxies_.find(ntohl(esm->serial)) != proxies_.end()) {
489  proxies_[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial));
490  }
491  } else if (msgid == MSG_BB_INTERFACE_CREATED) {
492  bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
493  notifier_->notify_of_interface_created(em->type, em->id);
494  } else if (msgid == MSG_BB_INTERFACE_DESTROYED) {
495  bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
496  notifier_->notify_of_interface_destroyed(em->type, em->id);
497  } else {
498  wait_mutex_->lock();
499  m_ = m;
500  m_->ref();
501  wait_cond_->wake_all();
502  wait_mutex_->unlock();
503  }
504  } catch (Exception &e) {
505  // Bam, you're dead. Ok, not now, we just ignore that this shit happened...
506  }
507  }
508 
509  mutex_->lock();
510  inbound_thread_ = NULL;
511  mutex_->unlock();
512 }
513 
514 void
515 RemoteBlackBoard::connection_died(unsigned int id) throw()
516 {
517  // mark all assigned interfaces as invalid
518  proxies_.lock();
519  for (pit_ = proxies_.begin(); pit_ != proxies_.end(); ++pit_) {
520  pit_->second->interface()->set_validity(false);
521  invalid_proxies_.push_back(pit_->second);
522  }
523  proxies_.clear();
524  proxies_.unlock();
525  wait_cond_->wake_all();
526 }
527 
528 void
530 {
531 }
532 
533 } // end namespace fawkes
uint32_t serial
instance serial to uniquely identify this instance (big endian)
Definition: messages.h:94
Interface * new_interface_instance(const char *type, const char *identifier)
Creates a new interface instance.
const char * owner() const
Get owner of interface.
Definition: interface.cpp:660
bool has_next()
Check if more list elements are available.
BlackBoard instance factory.
Wait until a given condition holds.
char type[INTERFACE_TYPE_SIZE_]
interface type name
Definition: messages.h:91
Requested interface type is unknown.
Definition: messages.h:60
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)
Open interface for writing.
Definition: remote.cpp:273
Simple Fawkes network client.
Definition: client.h:51
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
char type[INTERFACE_TYPE_SIZE_]
interface type name
Definition: messages.h:110
Message to identify an two interface instances.
Definition: messages.h:127
int64_t timestamp_sec
data or write timestamp, sec part
Definition: messages.h:101
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
void disconnect()
Disconnect socket.
Definition: client.cpp:539
int64_t timestamp_usec
data or write timestamp, usec part
Definition: messages.h:102
const char * id() const
Get identifier of interface.
Definition: interface.cpp:649
void register_handler(FawkesNetworkClientHandler *handler, unsigned int component_id)
Register handler.
Definition: client.cpp:658
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send.
Definition: client.cpp:596
Representation of a message that is sent over the network.
Definition: message.h:76
void connect()
Connect to remote.
Definition: client.cpp:424
bb_iinfo_msg_t * next(size_t *size)
Get next plugin from list.
A class for handling time.
Definition: time.h:92
virtual void deregistered(unsigned int id)
We are no longer registered in Fawkes network client.
Definition: remote.cpp:447
virtual void connection_established(unsigned int id)
Client has established a connection.
Definition: remote.cpp:529
MT * msgc() const
Get correctly parsed output.
Definition: message.h:159
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:78
char id_pattern[INTERFACE_ID_SIZE_]
ID pattern.
Definition: messages.h:77
Message for interface info.
Definition: messages.h:89
You tried to open an interface for writing but there is already a writing instance for this interface...
Definition: messages.h:63
uint32_t event_serial
instance serial to unique identify instance that caused the event.
Definition: messages.h:130
virtual InterfaceInfoList * list(const char *type_pattern, const char *id_pattern)
Get list of interfaces matching type and ID patterns.
Definition: remote.cpp:391
const unsigned char * hash() const
Get interface hash.
Definition: interface.cpp:298
Interface information list.
const char * type() const
Get type of interface.
Definition: interface.cpp:640
virtual void close(Interface *interface)
Close interface.
Definition: remote.cpp:319
Base class for exceptions in Fawkes.
Definition: exception.h:35
unsigned short serial() const
Get instance serial of interface.
Definition: interface.cpp:683
virtual ~RemoteBlackBoard()
Destructor.
Definition: remote.cpp:112
uint32_t serial
instance serial to unique identify own instance
Definition: messages.h:129
uint32_t writer_readers
combined writer reader information.
Definition: messages.h:96
void delete_interface_instance(Interface *interface)
Destroy an interface instance.
BlackBoardNotifier * notifier_
Notifier for BB events.
Definition: blackboard.h:111
virtual bool try_aliveness_restore()
Try to restore the aliveness of the BlackBoard instance.
Definition: remote.cpp:157
virtual void inbound_received(FawkesNetworkMessage *msg, unsigned int id)
Called for incoming messages.
Definition: remote.cpp:452
The hashes of the interfaces do not match.
Definition: messages.h:61
static Thread * current_thread()
Get the Thread instance of the currently running thread.
Definition: thread.cpp:1366
const char * name() const
Get name of thread.
Definition: thread.h:100
virtual void connection_died(unsigned int id)
Client connection died.
Definition: remote.cpp:515
void wait()
Wait for the condition forever.
void append(const char *type, const char *id, const unsigned char *hash, unsigned int serial, bool has_writer, unsigned int num_readers, const std::list< std::string > &readers, const std::string &writer, const Time &timestamp)
Append an interface info.
bool is_writer() const
Check if this is a writing instance.
Definition: interface.cpp:438
char id[INTERFACE_ID_SIZE_]
interface instance ID
Definition: messages.h:92
unsigned short int msgid() const
Get message type ID.
Definition: message.cpp:294
uint32_t serial
instance serial to unique identify this instance
Definition: messages.h:120
RemoteBlackBoard(FawkesNetworkClient *client)
Constructor.
Definition: remote.cpp:56
unsigned char hash[INTERFACE_HASH_SIZE_]
interface version hash
Definition: messages.h:93
virtual InterfaceInfoList * list_all()
Get list of all currently existing interfaces.
Definition: remote.cpp:345
void deregister_handler(unsigned int component_id)
Deregister handler.
Definition: client.cpp:676
Message for interface events.
Definition: messages.h:108
void set_validity(bool valid)
Mark this interface invalid.
Definition: interface.cpp:451
Message to request constrained interface list.
Definition: messages.h:74
BlackBoard interface list content.
Definition: ilist_content.h:35
Message to identify an interface instance.
Definition: messages.h:118
void lock()
Lock this mutex.
Definition: mutex.cpp:87
char id[INTERFACE_ID_SIZE_]
interface instance ID
Definition: messages.h:111
bool connected() const
Check if connection is alive.
Definition: client.cpp:828
MT * msg() const
Get correctly casted payload.
Definition: message.h:120
Mutex mutual exclusion lock.
Definition: mutex.h:32
char type_pattern[INTERFACE_TYPE_SIZE_]
type pattern
Definition: messages.h:76
virtual bool is_alive() const
Check if the BlackBoard is still alive.
Definition: remote.cpp:132
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)
Open interface for reading.
Definition: remote.cpp:267