Fawkes API Fawkes Development Version
sync_thread.cpp
1
2/***************************************************************************
3 * sync_thread.cpp - Fawkes BlackBoard Synchronization Thread
4 *
5 * Created: Thu Jun 04 18:13:06 2009
6 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL file in the doc directory.
21 */
22
23#include "sync_thread.h"
24
25#include <blackboard/remote.h>
26#include <core/threading/mutex_locker.h>
27#include <utils/time/wait.h>
28
29#include <cstring>
30
31using namespace std;
32using namespace fawkes;
33
34/** @class BlackBoardSynchronizationThread "sync_thread.h"
35 * Thread to synchronize two BlackBoards.
36 * @author Tim Niemueller
37 */
38
39/** Constructor.
40 * @param bbsync_cfg_prefix Configuration prefix for the whole bbsync plugin
41 * @param peer_cfg_prefix The configuration prefix for the peer this sync thread
42 * has been created for.
43 * @param peer name of the peer configuration for this thread
44 */
46 std::string &peer_cfg_prefix,
47 std::string &peer)
48: Thread("", Thread::OPMODE_CONTINUOUS)
49{
50 set_name("BBSyncThread[%s]", peer.c_str());
52
53 bbsync_cfg_prefix_ = bbsync_cfg_prefix;
54 peer_cfg_prefix_ = peer_cfg_prefix;
55 peer_ = peer;
56
57 remote_bb_ = NULL;
58}
59
60/** Destructor. */
62{
63}
64
65void
67{
68 logger->log_debug(name(), "Initializing");
69 unsigned int check_interval = 0;
70 try {
71 host_ = config->get_string((peer_cfg_prefix_ + "host").c_str());
72 port_ = config->get_uint((peer_cfg_prefix_ + "port").c_str());
73
74 check_interval = config->get_uint((bbsync_cfg_prefix_ + "check_interval").c_str());
75 } catch (Exception &e) {
76 e.append("Host or port not specified for peer");
77 throw;
78 }
79
80 try {
81 check_interval = config->get_uint((peer_cfg_prefix_ + "check_interval").c_str());
82 logger->log_debug(name(), "Peer check interval set, overriding default.");
83 } catch (Exception &e) {
84 logger->log_debug(name(), "No per-peer check interval set, using default");
85 }
86
87 read_config_combos(peer_cfg_prefix_ + "reading/", /* writing */ false);
88 read_config_combos(peer_cfg_prefix_ + "writing/", /* writing */ true);
89
90 for (ComboMap::iterator i = combos_.begin(); i != combos_.end(); ++i) {
92 "Combo: %s, %s (%s, R) -> %s (%s, W)",
93 i->second.type.c_str(),
94 i->second.reader_id.c_str(),
95 i->second.remote_writer ? "local" : "remote",
96 i->second.writer_id.c_str(),
97 i->second.remote_writer ? "remote" : "local");
98 }
99
100 wsl_local_ = new SyncWriterInterfaceListener(this, logger, (peer_ + "/local").c_str());
101 wsl_remote_ = new SyncWriterInterfaceListener(this, logger, (peer_ + "/remote").c_str());
102
103 if (!check_connection()) {
104 logger->log_warn(name(), "Remote peer not reachable, will keep trying");
105 }
106
107 logger->log_debug(name(), "Checking for remote aliveness every %u ms", check_interval);
108 timewait_ = new TimeWait(clock, check_interval * 1000);
109}
110
111void
113{
114 delete timewait_;
115
116 close_interfaces();
117
118 delete wsl_local_;
119 delete wsl_remote_;
120 delete remote_bb_;
121 remote_bb_ = NULL;
122}
123
124void
126{
127 timewait_->mark_start();
128 check_connection();
129 timewait_->wait_systime();
130}
131
132bool
133BlackBoardSynchronizationThread::check_connection()
134{
135 if (!remote_bb_ || !remote_bb_->is_alive()) {
136 if (remote_bb_) {
138 "Lost connection via remote BB to %s (%s:%u), will try to re-establish",
139 peer_.c_str(),
140 host_.c_str(),
141 port_);
142 blackboard->unregister_listener(wsl_local_);
143 remote_bb_->unregister_listener(wsl_remote_);
144 close_interfaces();
145 delete remote_bb_;
146 remote_bb_ = NULL;
147 }
148
149 try {
150 remote_bb_ = new RemoteBlackBoard(host_.c_str(), port_);
152 "Successfully connected via remote BB to %s (%s:%u)",
153 peer_.c_str(),
154 host_.c_str(),
155 port_);
156
157 open_interfaces();
158 blackboard->register_listener(wsl_local_, BlackBoard::BBIL_FLAG_WRITER);
159 remote_bb_->register_listener(wsl_remote_, BlackBoard::BBIL_FLAG_WRITER);
160 } catch (Exception &e) {
161 e.print_trace();
162 return false;
163 }
164 }
165 return true;
166}
167
168void
169BlackBoardSynchronizationThread::read_config_combos(std::string prefix, bool writing)
170{
171 Configuration::ValueIterator *i = config->search(prefix.c_str());
172 while (i->next()) {
173 if (strcmp(i->type(), "string") != 0) {
174 TypeMismatchException e("Only values of type string may occur in %s, "
175 "but found value of type %s",
176 prefix.c_str(),
177 i->type());
178 delete i;
179 throw e;
180 }
181
182 std::string varname = std::string(i->path()).substr(prefix.length());
183 std::string uid = i->get_string();
184 size_t sf;
185
186 if ((sf = uid.find("::")) == std::string::npos) {
187 delete i;
188 throw Exception("Interface UID '%s' at %s is not valid, missing double colon",
189 uid.c_str(),
190 i->path());
191 }
192
193 std::string type = uid.substr(0, sf);
194 std::string id = uid.substr(sf + 2);
195 combo_t combo = {type, id, id, writing};
196
197 if ((sf = id.find("=")) != std::string::npos) {
198 // we got a mapping
199 combo.reader_id = id.substr(0, sf);
200 combo.writer_id = id.substr(sf + 1);
201 }
202
203 combos_[varname] = combo;
204 }
205 delete i;
206}
207
208void
209BlackBoardSynchronizationThread::open_interfaces()
210{
211 logger->log_debug(name(), "Opening interfaces");
212 MutexLocker lock(interfaces_.mutex());
213
214 ComboMap::iterator i;
215 for (i = combos_.begin(); i != combos_.end(); ++i) {
216 Interface *iface_reader = NULL, *iface_writer = NULL;
217
218 BlackBoard *writer_bb = i->second.remote_writer ? remote_bb_ : blackboard;
219 BlackBoard *reader_bb = i->second.remote_writer ? blackboard : remote_bb_;
220
221 try {
223 "Opening reading %s (%s:%s)",
224 i->second.remote_writer ? "locally" : "remotely",
225 i->second.type.c_str(),
226 i->second.reader_id.c_str());
227 iface_reader =
228 reader_bb->open_for_reading(i->second.type.c_str(), i->second.reader_id.c_str());
230 "Opened interface with serial %s",
231 iface_reader->serial().get_string().c_str());
232
233 if (iface_reader->has_writer()) {
235 "Opening writing on %s (%s:%s)",
236 i->second.remote_writer ? "remotely" : "locally",
237 i->second.type.c_str(),
238 i->second.writer_id.c_str());
239 iface_writer =
240 writer_bb->open_for_writing(i->second.type.c_str(), i->second.writer_id.c_str());
241 }
242
243 InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
244 interfaces_[iface_reader] = ii;
245
246 } catch (Exception &e) {
247 reader_bb->close(iface_reader);
248 writer_bb->close(iface_writer);
249 throw;
250 }
251
252 SyncInterfaceListener *sync_listener = NULL;
253 if (iface_writer) {
254 logger->log_debug(name(), "Creating sync listener");
255 sync_listener =
256 new SyncInterfaceListener(logger, iface_reader, iface_writer, reader_bb, writer_bb);
257 }
258 sync_listeners_[iface_reader] = sync_listener;
259
260 if (i->second.remote_writer) {
261 wsl_local_->add_interface(iface_reader);
262 } else {
263 wsl_remote_->add_interface(iface_reader);
264 }
265 }
266}
267
268void
269BlackBoardSynchronizationThread::close_interfaces()
270{
271 SyncListenerMap::iterator s;
272 for (s = sync_listeners_.begin(); s != sync_listeners_.end(); ++s) {
273 if (s->second) {
274 logger->log_debug(name(), "Closing sync listener %s", s->second->bbil_name());
275 delete s->second;
276 }
277 }
278 MutexLocker lock(interfaces_.mutex());
279 InterfaceMap::iterator i;
280 for (i = interfaces_.begin(); i != interfaces_.end(); ++i) {
282 "Closing %s reading interface %s",
283 i->second.combo->remote_writer ? "local" : "remote",
284 i->first->uid());
285 if (i->second.combo->remote_writer) {
286 wsl_local_->remove_interface(i->first);
287 blackboard->close(i->first);
288 } else {
289 wsl_remote_->remove_interface(i->first);
290 remote_bb_->close(i->first);
291 }
292 if (i->second.writer) {
294 "Closing %s writing interface %s",
295 i->second.combo->remote_writer ? "remote" : "local",
296 i->second.writer->uid());
297 if (i->second.combo->remote_writer) {
298 remote_bb_->close(i->second.writer);
299 } else {
300 blackboard->close(i->second.writer);
301 }
302 }
303 }
304 interfaces_.clear();
305 sync_listeners_.clear();
306}
307
308/** A writer has been added for an interface.
309 * To be called only by SyncWriterInterfaceListener.
310 * @param interface the interface a writer has been added for.
311 */
312void
314{
315 MutexLocker lock(interfaces_.mutex());
316
317 if (interfaces_[interface].writer) {
318 // There exists a writer!?
319 logger->log_warn(name(),
320 "Writer added for %s, but relay exists already. Bug?",
321 interface->uid());
322 } else {
323 logger->log_warn(name(), "Writer added for %s, opening relay writer", interface->uid());
324
325 Interface * iface = NULL;
326 SyncInterfaceListener *sync_listener = NULL;
327 InterfaceInfo & ii = interfaces_[interface];
328 try {
329 iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(), ii.combo->writer_id.c_str());
330
331 logger->log_debug(name(),
332 "Creating sync listener for %s:%s-%s",
333 ii.combo->type.c_str(),
334 ii.combo->reader_id.c_str(),
335 ii.combo->writer_id.c_str());
336
337 sync_listener =
338 new SyncInterfaceListener(logger, interface, iface, ii.reader_bb, ii.writer_bb);
339
340 sync_listeners_[interface] = sync_listener;
341 ii.writer = iface;
342
343 } catch (Exception &e) {
344 delete sync_listener;
345 ii.writer_bb->close(iface);
346 logger->log_error(name(),
347 "Failed to open writer for %s:%s-%s, sync broken",
348 ii.combo->type.c_str(),
349 ii.combo->reader_id.c_str(),
350 ii.combo->writer_id.c_str());
351 logger->log_error(name(), e);
352 }
353 }
354}
355
356/** A writer has been removed for an interface.
357 * To be called only by SyncWriterInterfaceListener.
358 * @param interface the interface a writer has been removed for.
359 */
360void
362{
363 MutexLocker lock(interfaces_.mutex());
364
365 if (!interfaces_[interface].writer) {
366 // We do not have a writer!?
367 logger->log_warn(name(), "Writer removed for %s, but no relay exists. Bug?", interface->uid());
368 } else {
369 logger->log_warn(name(), "Writer removed for %s, closing relay writer", interface->uid());
370
371 InterfaceInfo &ii = interfaces_[interface];
372 try {
373 delete sync_listeners_[interface];
374 sync_listeners_[interface] = NULL;
375
376 ii.writer_bb->close(ii.writer);
377 ii.writer = NULL;
378
379 } catch (Exception &e) {
380 logger->log_error(name(),
381 "Failed to close writer for %s:%s-%s, sync broken",
382 ii.combo->type.c_str(),
383 ii.combo->reader_id.c_str(),
384 ii.combo->writer_id.c_str());
385 logger->log_error(name(), e);
386 }
387 }
388}
void writer_removed(fawkes::Interface *interface) noexcept
A writer has been removed for an interface.
virtual void init()
Initialize the thread.
Definition: sync_thread.cpp:66
virtual void finalize()
Finalize the thread.
virtual void loop()
Code to execute in the thread.
void writer_added(fawkes::Interface *interface) noexcept
A writer has been added for an interface.
virtual ~BlackBoardSynchronizationThread()
Destructor.
Definition: sync_thread.cpp:61
BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix, std::string &peer_cfg_prefix, std::string &peer)
Constructor.
Definition: sync_thread.cpp:45
Synchronize two interfaces.
Definition: sync_listener.h:34
Listener for writer events in bbsync plugin.
void add_interface(fawkes::Interface *interface)
Add an interface to listen to.
void remove_interface(fawkes::Interface *interface)
Remove an interface to listen to.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
The BlackBoard abstract class.
Definition: blackboard.h:46
virtual bool is_alive() const noexcept=0
Check if the BlackBoard is still alive.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:212
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:185
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
Iterator interface to iterate over config values.
Definition: config.h:75
virtual const char * path() const =0
Path of value.
virtual bool next()=0
Check if there is another element and advance to this if possible.
virtual const char * type() const =0
Type of value.
virtual std::string get_string() const =0
Get string value.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void print_trace() noexcept
Prints trace to stderr.
Definition: exception.cpp:601
void append(const char *format,...) noexcept
Append messages to the message list.
Definition: exception.cpp:333
Interface info.
const char * type() const
Get interface type.
const std::string & writer() const
Get name of writer on interface.
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
Uuid serial() const
Get instance serial of interface.
Definition: interface.cpp:695
bool has_writer() const
Check if there is a writer for the interface.
Definition: interface.cpp:848
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:133
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Definition: multi.cpp:174
virtual void log_error(const char *component, const char *format,...)
Log error message.
Definition: multi.cpp:237
Mutex locking helper.
Definition: mutex_locker.h:34
Remote BlackBoard.
Definition: remote.h:50
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
Time wait utility.
Definition: wait.h:33
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
void wait_systime()
Wait until minimum loop time has been reached in real time.
Definition: wait.cpp:96
std::string get_string() const
Get the string representation of the Uuid.
Definition: uuid.cpp:107
Fawkes library namespace.