Fawkes API  Fawkes Development Version
mongodb_log_bb_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_bb_thread.cpp - MongoDB blackboard logging Thread
4  *
5  * Created: Wed Dec 08 23:09:29 2010
6  * Copyright 2010-2017 Tim Niemueller [www.niemueller.de]
7  * 2012 Bastian Klingen
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "mongodb_log_bb_thread.h"
24 
25 #include <core/threading/mutex_locker.h>
26 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
27 
28 #include <cstdlib>
29 
30 // from MongoDB
31 #include <mongo/client/dbclient.h>
32 
33 #include <fnmatch.h>
34 
35 using namespace mongo;
36 using namespace fawkes;
37 
38 /** @class MongoLogBlackboardThread "mongodb_thread.h"
39  * MongoDB Logging Thread.
40  * This thread registers to interfaces specified with patterns in the
41  * configurationa and logs any changes to MongoDB.
42  *
43  * @author Tim Niemueller
44  */
45 
46 /** Constructor. */
48 : Thread("MongoLogBlackboardThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
49 {
50 }
51 
52 /** Destructor. */
54 {
55 }
56 
57 void
59 {
60  now_ = new Time(clock);
61  database_ = "fflog";
62  try {
63  database_ = config->get_string("/plugins/mongodb-log/database");
64  } catch (Exception &e) {
65  logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
66  }
67 
68  std::vector<std::string> includes;
69  try {
70  includes = config->get_strings("/plugins/mongodb-log/blackboard/includes");
71  } catch (Exception &e) {
72  } // ignored, no include rules
73  try {
74  excludes_ = config->get_strings("/plugins/mongodb-log/blackboard/excludes");
75  } catch (Exception &e) {
76  } // ignored, no include rules
77 
78  if (includes.empty()) {
79  includes.push_back("*");
80  }
81 
82  std::vector<std::string>::iterator i;
83  std::vector<std::string>::iterator e;
84  for (i = includes.begin(); i != includes.end(); ++i) {
85  bbio_add_observed_create("*", i->c_str());
86 
87  std::list<Interface *> current_interfaces =
88  blackboard->open_multiple_for_reading("*", i->c_str());
89 
90  std::list<Interface *>::iterator i;
91  for (i = current_interfaces.begin(); i != current_interfaces.end(); ++i) {
92  bool exclude = false;
93  for (e = excludes_.begin(); e != excludes_.end(); ++e) {
94  if (fnmatch(e->c_str(), (*i)->id(), 0) != FNM_NOMATCH) {
95  logger->log_debug(name(), "Excluding '%s' by config rule", (*i)->uid());
96  blackboard->close(*i);
97  exclude = true;
98  break;
99  }
100  }
101  if (exclude)
102  continue;
103 
104  logger->log_debug(name(), "Adding %s", (*i)->uid());
105  mongo::DBClientBase *mc = mongodb_connmgr->create_client();
106  listeners_[(*i)->uid()] =
107  new InterfaceListener(blackboard, *i, mc, database_, collections_, logger, now_);
108  }
109  }
110 
112 }
113 
114 void
116 {
118 
119  std::map<std::string, InterfaceListener *>::iterator i;
120  for (i = listeners_.begin(); i != listeners_.end(); ++i) {
121  mongo::DBClientBase *mc = i->second->mongodb_client();
122  delete i->second;
124  }
125  listeners_.clear();
126 }
127 
128 void
130 {
131 }
132 
133 // for BlackBoardInterfaceObserver
134 void
135 MongoLogBlackboardThread::bb_interface_created(const char *type, const char *id) throw()
136 {
137  MutexLocker lock(listeners_.mutex());
138 
139  std::vector<std::string>::iterator e;
140  for (e = excludes_.begin(); e != excludes_.end(); ++e) {
141  if (fnmatch(e->c_str(), id, 0) != FNM_NOMATCH) {
142  logger->log_debug(name(), "Ignoring excluded interface '%s::%s'", type, id);
143  return;
144  }
145  }
146 
147  try {
148  Interface *interface = blackboard->open_for_reading(type, id);
149  if (listeners_.find(interface->uid()) == listeners_.end()) {
150  logger->log_debug(name(), "Opening new %s", interface->uid());
151  mongo::DBClientBase *mc = mongodb_connmgr->create_client();
152  listeners_[interface->uid()] =
153  new InterfaceListener(blackboard, interface, mc, database_, collections_, logger, now_);
154  } else {
155  logger->log_warn(name(), "Interface %s already opened", interface->uid());
156  blackboard->close(interface);
157  }
158  } catch (Exception &e) {
159  logger->log_warn(name(), "Failed to open interface %s::%s, exception follows", type, id);
160  logger->log_warn(name(), e);
161  }
162 }
163 
164 /** Constructor.
165  * @param blackboard blackboard
166  * @param interface interface to listen for
167  * @param mongodb MongoDB client to write to
168  * @param database name of database to write to
169  * @param colls collections
170  * @param logger logger
171  * @param now Time
172  */
173 MongoLogBlackboardThread::InterfaceListener::InterfaceListener(BlackBoard * blackboard,
174  Interface * interface,
175  mongo::DBClientBase * mongodb,
176  std::string & database,
177  LockSet<std::string> &colls,
178  Logger * logger,
179  Time * now)
180 : BlackBoardInterfaceListener("MongoLogListener-%s", interface->uid()),
181  database_(database),
182  collections_(colls)
183 {
184  blackboard_ = blackboard;
185  interface_ = interface;
186  mongodb_ = mongodb;
187  logger_ = logger;
188  now_ = now;
189 
190  // sanitize interface ID to be suitable for MongoDB
191  std::string id = interface->id();
192  size_t pos = 0;
193  while ((pos = id.find_first_of(" -", pos)) != std::string::npos) {
194  id.replace(pos, 1, "_");
195  pos = pos + 1;
196  }
197  collection_ = database_ + "." + interface->type() + "." + id;
198  if (collections_.find(collection_) != collections_.end()) {
199  throw Exception("Collection named %s already used, cannot log %s",
200  collection_.c_str(),
201  interface->uid());
202  }
203 
204  bbil_add_data_interface(interface);
205  blackboard_->register_listener(this, BlackBoard::BBIL_FLAG_DATA);
206 }
207 
208 /** Destructor. */
209 MongoLogBlackboardThread::InterfaceListener::~InterfaceListener()
210 {
211  blackboard_->unregister_listener(this);
212 }
213 
214 void
215 MongoLogBlackboardThread::InterfaceListener::bb_interface_data_changed(Interface *interface) throw()
216 {
217  now_->stamp();
218  interface->read();
219 
220  try {
221  // write interface data
222  BSONObjBuilder document;
223  document.append("timestamp", (long long)now_->in_msec());
225  for (i = interface->fields(); i != interface->fields_end(); ++i) {
226  size_t length = i.get_length();
227  bool is_array = (length > 1);
228 
229  switch (i.get_type()) {
230  case IFT_BOOL:
231  if (is_array) {
232  bool * bools = i.get_bools();
233  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
234  for (size_t l = 0; l < length; ++l) {
235  subb.append(bools[l]);
236  }
237  subb.doneFast();
238  } else {
239  document.append(i.get_name(), i.get_bool());
240  }
241  break;
242 
243  case IFT_INT8:
244  if (is_array) {
245  int8_t * ints = i.get_int8s();
246  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
247  for (size_t l = 0; l < length; ++l) {
248  subb.append(ints[l]);
249  }
250  subb.doneFast();
251  } else {
252  document.append(i.get_name(), i.get_int8());
253  }
254  break;
255 
256  case IFT_UINT8:
257  if (is_array) {
258  uint8_t * ints = i.get_uint8s();
259  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
260  for (size_t l = 0; l < length; ++l) {
261  subb.append(ints[l]);
262  }
263  subb.doneFast();
264  } else {
265  document.append(i.get_name(), i.get_uint8());
266  }
267  break;
268 
269  case IFT_INT16:
270  if (is_array) {
271  int16_t * ints = i.get_int16s();
272  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
273  for (size_t l = 0; l < length; ++l) {
274  subb.append(ints[l]);
275  }
276  subb.doneFast();
277  } else {
278  document.append(i.get_name(), i.get_int16());
279  }
280  break;
281 
282  case IFT_UINT16:
283  if (is_array) {
284  uint16_t * ints = i.get_uint16s();
285  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
286  for (size_t l = 0; l < length; ++l) {
287  subb.append(ints[l]);
288  }
289  subb.doneFast();
290  } else {
291  document.append(i.get_name(), i.get_uint16());
292  }
293  break;
294 
295  case IFT_INT32:
296  if (is_array) {
297  int32_t * ints = i.get_int32s();
298  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
299  for (size_t l = 0; l < length; ++l) {
300  subb.append(ints[l]);
301  }
302  subb.doneFast();
303  } else {
304  document.append(i.get_name(), i.get_int32());
305  }
306  break;
307 
308  case IFT_UINT32:
309  if (is_array) {
310  uint32_t * ints = i.get_uint32s();
311  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
312  for (size_t l = 0; l < length; ++l) {
313  subb.append(ints[l]);
314  }
315  subb.doneFast();
316  } else {
317  document.append(i.get_name(), i.get_uint32());
318  }
319  break;
320 
321  case IFT_INT64:
322  if (is_array) {
323  int64_t * ints = i.get_int64s();
324  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
325  for (size_t l = 0; l < length; ++l) {
326  subb.append((long long int)ints[l]);
327  }
328  subb.doneFast();
329  } else {
330  document.append(i.get_name(), (long long int)i.get_int64());
331  }
332  break;
333 
334  case IFT_UINT64:
335  if (is_array) {
336  uint64_t * ints = i.get_uint64s();
337  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
338  for (size_t l = 0; l < length; ++l) {
339  subb.append((long long int)ints[l]);
340  }
341  subb.doneFast();
342  } else {
343  document.append(i.get_name(), (long long int)i.get_uint64());
344  }
345  break;
346 
347  case IFT_FLOAT:
348  if (is_array) {
349  float * floats = i.get_floats();
350  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
351  for (size_t l = 0; l < length; ++l) {
352  subb.append(floats[l]);
353  }
354  subb.doneFast();
355  } else {
356  document.append(i.get_name(), i.get_float());
357  }
358  break;
359 
360  case IFT_DOUBLE:
361  if (is_array) {
362  double * doubles = i.get_doubles();
363  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
364  for (size_t l = 0; l < length; ++l) {
365  subb.append(doubles[l]);
366  }
367  subb.doneFast();
368  } else {
369  document.append(i.get_name(), i.get_double());
370  }
371  break;
372 
373  case IFT_STRING: document.append(i.get_name(), i.get_string()); break;
374 
375  case IFT_BYTE:
376  if (is_array) {
377  document.appendBinData(i.get_name(), length, BinDataGeneral, i.get_bytes());
378  } else {
379  document.append(i.get_name(), i.get_byte());
380  }
381  break;
382 
383  case IFT_ENUM:
384  if (is_array) {
385  int32_t * ints = i.get_enums();
386  BSONArrayBuilder subb(document.subarrayStart(i.get_name()));
387  for (size_t l = 0; l < length; ++l) {
388  subb.append(ints[l]);
389  }
390  subb.doneFast();
391  } else {
392  document.append(i.get_name(), i.get_enum());
393  }
394  break;
395  }
396  }
397 
398  mongodb_->insert(collection_, document.obj());
399  } catch (mongo::DBException &e) {
400  logger_->log_warn(bbil_name(), "Failed to log to %s: %s", collection_.c_str(), e.what());
401  } catch (std::exception &e) {
402  logger_->log_warn(bbil_name(), "Failed to log to %s: %s (*)", collection_.c_str(), e.what());
403  }
404 }
64 bit integer field
Definition: types.h:44
Interface field iterator.
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:225
uint8_t * get_bytes() const
Get value of current field as byte array.
uint16_t get_uint16(unsigned int index=0) const
Get value of current field as unsigned integer.
int32_t * get_enums() const
Get value of current enum field as integer array.
int32_t get_enum(unsigned int index=0) const
Get value of current enum field as integer.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
double get_double(unsigned int index=0) const
Get value of current field as double.
uint16_t * get_uint16s() const
Get value of current field as unsigned integer array.
Fawkes library namespace.
bool get_bool(unsigned int index=0) const
Get value of current field as bool.
8 bit unsigned integer field
Definition: types.h:39
Mutex locking helper.
Definition: mutex_locker.h:33
float * get_floats() const
Get value of current field as float array.
16 bit unsigned integer field
Definition: types.h:41
const char * id() const
Get identifier of interface.
Definition: interface.cpp:649
virtual void loop()
Code to execute in the thread.
interface_fieldtype_t get_type() const
Get type of current field.
string field
Definition: types.h:48
A class for handling time.
Definition: time.h:92
virtual ~MongoLogBlackboardThread()
Destructor.
byte field, alias for uint8
Definition: types.h:49
MongoDBConnCreator * mongodb_connmgr
Connection manager to retrieve more client connections from if necessary.
Definition: mongodb.h:56
Thread class encapsulation of pthreads.
Definition: thread.h:45
float get_float(unsigned int index=0) const
Get value of current field as float.
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:78
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
16 bit integer field
Definition: types.h:40
uint8_t get_byte(unsigned int index=0) const
Get value of current field as byte.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
int16_t get_int16(unsigned int index=0) const
Get value of current field as integer.
int64_t * get_int64s() const
Get value of current field as integer array.
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*")
Add interface creation type to watch list.
uint8_t * get_uint8s() const
Get value of current field as unsigned integer array.
const char * type() const
Get type of interface.
Definition: interface.cpp:640
int8_t get_int8(unsigned int index=0) const
Get value of current field as integer.
Base class for exceptions in Fawkes.
Definition: exception.h:35
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:472
int8_t * get_int8s() const
Get value of current field as integer array.
Thread aspect to access MongoDB.
Definition: mongodb.h:39
const char * get_name() const
Get name of current field.
uint8_t get_uint8(unsigned int index=0) const
Get value of current field as unsigned integer.
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
double * get_doubles() const
Get value of current field as double array.
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: blackboard.cpp:240
virtual mongo::DBClientBase * create_client(const std::string &config_name="")=0
Create a new MongoDB client.
const char * name() const
Get name of thread.
Definition: thread.h:100
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:674
uint64_t get_uint64(unsigned int index=0) const
Get value of current field as unsigned integer.
64 bit unsigned integer field
Definition: types.h:45
const char * get_string() const
Get value of current field as string.
uint64_t * get_uint64s() const
Get value of current field as unsigned integer array.
uint32_t get_uint32(unsigned int index=0) const
Get value of current field as unsigned integer.
float field
Definition: types.h:46
bool * get_bools() const
Get value of current field as bool array.
size_t get_length() const
Get length of current field.
32 bit integer field
Definition: types.h:42
InterfaceFieldIterator fields_end()
Invalid iterator.
Definition: interface.cpp:1204
uint32_t * get_uint32s() const
Get value of current field as unsigned integer array.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
int32_t * get_int32s() const
Get value of current field as integer array.
The BlackBoard abstract class.
Definition: blackboard.h:45
InterfaceFieldIterator fields()
Get iterator over all fields of this interface instance.
Definition: interface.cpp:1195
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
boolean field
Definition: types.h:37
int32_t get_int32(unsigned int index=0) const
Get value of current field as integer.
int16_t * get_int16s() const
Get value of current field as integer array.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Definition: multi.cpp:174
32 bit unsigned integer field
Definition: types.h:43
field with interface specific enum type
Definition: types.h:50
8 bit integer field
Definition: types.h:38
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
BlackBoard interface listener.
double field
Definition: types.h:47
virtual void delete_client(mongo::DBClientBase *client)=0
Delete a client.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void close(Interface *interface)=0
Close interface.
Interface for logging.
Definition: logger.h:41
int64_t get_int64(unsigned int index=0) const
Get value of current field as integer.