23#include "mongodb_log_bb_thread.h"
25#include <core/threading/mutex_locker.h>
26#include <plugins/mongodb/aspect/mongodb_conncreator.h>
30#include <mongocxx/client.hpp>
31#include <mongocxx/exception/operation_exception.hpp>
33using namespace mongocxx;
66 std::vector<std::string> includes;
76 if (includes.empty()) {
77 includes.push_back(
"*");
80 std::vector<std::string>::iterator i;
81 std::vector<std::string>::iterator e;
82 for (i = includes.begin(); i != includes.end(); ++i) {
85 std::list<Interface *> current_interfaces =
88 std::list<Interface *>::iterator i;
89 for (i = current_interfaces.begin(); i != current_interfaces.end(); ++i) {
91 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
92 if (fnmatch(e->c_str(), (*i)->id(), 0) != FNM_NOMATCH) {
105 listeners_[(*i)->uid()] =
new InterfaceListener(
118 std::map<std::string, InterfaceListener *>::iterator i;
119 for (i = listeners_.begin(); i != listeners_.end(); ++i) {
120 client *mc = i->second->mongodb_client();
138 std::vector<std::string>::iterator e;
139 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
140 if (fnmatch(e->c_str(),
id, 0) != FNM_NOMATCH) {
141 logger->
log_debug(name(),
"Ignoring excluded interface '%s::%s'", type,
id);
147 Interface *
interface = blackboard->open_for_reading(type, id);
148 if (listeners_.find(interface->uid()) == listeners_.end()) {
149 logger->
log_debug(name(),
"Opening new %s", interface->uid());
150 client * mc = mongodb_connmgr->create_client();
152 listeners_[interface->uid()] =
new InterfaceListener(
153 blackboard, interface, mc, database_, collections_, agent_name, logger, now_);
155 logger->
log_warn(name(),
"Interface %s already opened", interface->uid());
156 blackboard->
close(interface);
159 logger->
log_warn(name(),
"Failed to open interface %s::%s, exception follows", type,
id);
174MongoLogBlackboardThread::InterfaceListener::InterfaceListener(
BlackBoard * blackboard,
177 std::string & database,
179 const std::string & agent_name,
185 agent_name_(agent_name)
188 interface_ = interface;
194 std::string
id = interface->
id();
196 while ((pos =
id.find_first_of(
" -", pos)) != std::string::npos) {
197 id.replace(pos, 1,
"_");
200 collection_ = std::string(interface->
type()) +
"." + id;
201 if (collections_.find(collection_) != collections_.end()) {
202 throw Exception(
"Collection named %s already used, cannot log %s",
207 bbil_add_data_interface(interface);
208 blackboard_->register_listener(
this, BlackBoard::BBIL_FLAG_DATA);
212MongoLogBlackboardThread::InterfaceListener::~InterfaceListener()
214 blackboard_->unregister_listener(
this);
218MongoLogBlackboardThread::InterfaceListener::bb_interface_data_refreshed(
226 using namespace bsoncxx::builder;
227 basic::document document;
228 document.append(basic::kvp(
"timestamp",
static_cast<int64_t
>(now_->in_msec())));
232 bool is_array = (length > 1);
239 document.append(basic::kvp(key, [bools, length](basic::sub_array subarray) {
240 for (
size_t l = 0; l < length; ++l) {
241 subarray.append(bools[l]);
245 document.append(basic::kvp(key, i.
get_bool()));
252 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
253 for (
size_t l = 0; l < length; ++l) {
254 subarray.append(ints[l]);
258 document.append(basic::kvp(key, i.
get_int8()));
265 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
266 for (
size_t l = 0; l < length; ++l) {
267 subarray.append(ints[l]);
271 document.append(basic::kvp(key, i.
get_uint8()));
278 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
279 for (
size_t l = 0; l < length; ++l) {
280 subarray.append(ints[l]);
284 document.append(basic::kvp(key, i.
get_int16()));
291 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
292 for (
size_t l = 0; l < length; ++l) {
293 subarray.append(ints[l]);
297 document.append(basic::kvp(key, i.
get_uint16()));
304 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
305 for (
size_t l = 0; l < length; ++l) {
306 subarray.append(ints[l]);
310 document.append(basic::kvp(key, i.
get_int32()));
317 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
318 for (
size_t l = 0; l < length; ++l) {
319 subarray.append(
static_cast<int64_t
>(ints[l]));
323 document.append(basic::kvp(key,
static_cast<int64_t
>(i.
get_uint32())));
330 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
331 for (
size_t l = 0; l < length; ++l) {
332 subarray.append(ints[l]);
336 document.append(basic::kvp(key, i.
get_int64()));
343 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
344 for (
size_t l = 0; l < length; ++l) {
345 subarray.append(
static_cast<int64_t
>(ints[l]));
349 document.append(basic::kvp(key,
static_cast<int64_t
>(i.
get_uint64())));
356 document.append(basic::kvp(key, [floats, length](basic::sub_array subarray) {
357 for (
size_t l = 0; l < length; ++l) {
358 subarray.append(floats[l]);
362 document.append(basic::kvp(key, i.
get_float()));
369 document.append(basic::kvp(key, [doubles, length](basic::sub_array subarray) {
370 for (
size_t l = 0; l < length; ++l) {
371 subarray.append(doubles[l]);
375 document.append(basic::kvp(key, i.
get_double()));
384 document.append(basic::kvp(key, [bytes, length](basic::sub_array subarray) {
385 for (
size_t l = 0; l < length; ++l) {
386 subarray.append(bytes[l]);
390 document.append(basic::kvp(key, i.
get_byte()));
397 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
398 for (
size_t l = 0; l < length; ++l) {
399 subarray.append(ints[l]);
403 document.append(basic::kvp(key, i.
get_enum()));
409 document.append(basic::kvp(
"agent-name", agent_name_));
410 mongodb_->database(database_)[collection_].insert_one(document.view());
411 }
catch (operation_exception &e) {
413 bbil_name(),
"Failed to log to %s.%s: %s", database_.c_str(), collection_.c_str(), e.what());
414 }
catch (std::exception &e) {
415 logger_->log_warn(bbil_name(),
416 "Failed to log to %s.%s: %s (*)",
virtual void finalize()
Finalize the thread.
virtual ~MongoLogBlackboardThread()
Destructor.
virtual void bb_interface_created(const char *type, const char *id) noexcept
BlackBoard interface created notification.
virtual void init()
Initialize the thread.
virtual void loop()
Code to execute in the thread.
MongoLogBlackboardThread()
Constructor.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
BlackBoard interface listener.
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*") noexcept
Add interface creation type to watch list.
The BlackBoard abstract class.
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
Open multiple interfaces for reading.
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Configuration * config
This is the Configuration member used to access the configuration.
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Interface field iterator.
float get_float(unsigned int index=0) const
Get value of current field as float.
int16_t get_int16(unsigned int index=0) const
Get value of current field as integer.
int8_t get_int8(unsigned int index=0) const
Get value of current field as integer.
int8_t * get_int8s() const
Get value of current field as integer array.
float * get_floats() const
Get value of current field as float array.
int32_t get_int32(unsigned int index=0) const
Get value of current field as integer.
uint8_t * get_bytes() const
Get value of current field as byte array.
size_t get_length() const
Get length of current field.
int32_t * get_int32s() const
Get value of current field as integer array.
int64_t get_int64(unsigned int index=0) const
Get value of current field as integer.
uint64_t get_uint64(unsigned int index=0) const
Get value of current field as unsigned integer.
int32_t get_enum(unsigned int index=0) const
Get value of current enum field as integer.
uint16_t get_uint16(unsigned int index=0) const
Get value of current field as unsigned integer.
double get_double(unsigned int index=0) const
Get value of current field as double.
int32_t * get_enums() const
Get value of current enum field as integer array.
int64_t * get_int64s() const
Get value of current field as integer array.
uint64_t * get_uint64s() const
Get value of current field as unsigned integer array.
uint32_t get_uint32(unsigned int index=0) const
Get value of current field as unsigned integer.
interface_fieldtype_t get_type() const
Get type of current field.
const char * get_name() const
Get name of current field.
uint8_t * get_uint8s() const
Get value of current field as unsigned integer array.
uint16_t * get_uint16s() const
Get value of current field as unsigned integer array.
const char * get_string() const
Get value of current field as string.
uint8_t get_byte(unsigned int index=0) const
Get value of current field as byte.
bool * get_bools() const
Get value of current field as bool array.
uint8_t get_uint8(unsigned int index=0) const
Get value of current field as unsigned integer.
bool get_bool(unsigned int index=0) const
Get value of current field as bool.
uint32_t * get_uint32s() const
Get value of current field as unsigned integer array.
double * get_doubles() const
Get value of current field as double array.
int16_t * get_int16s() const
Get value of current field as integer array.
Base class for all Fawkes BlackBoard interfaces.
const char * type() const
Get type of interface.
InterfaceFieldIterator fields_end()
Invalid iterator.
const char * id() const
Get identifier of interface.
InterfaceFieldIterator fields()
Get iterator over all fields of this interface instance.
const char * uid() const
Get unique identifier of interface.
void read()
Read from BlackBoard into local copy.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Thread aspect to access MongoDB.
MongoDBConnCreator * mongodb_connmgr
Connection manager to retrieve more client connections from if necessary.
virtual mongocxx::client * create_client(const std::string &config_name="")=0
Create a new MongoDB client.
virtual void delete_client(mongocxx::client *client)=0
Delete a client.
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Thread class encapsulation of pthreads.
const char * name() const
Get name of thread.
A class for handling time.
Fawkes library namespace.
@ IFT_INT8
8 bit integer field
@ IFT_UINT32
32 bit unsigned integer field
@ IFT_BYTE
byte field, alias for uint8
@ IFT_UINT64
64 bit unsigned integer field
@ IFT_UINT16
16 bit unsigned integer field
@ IFT_INT32
32 bit integer field
@ IFT_INT64
64 bit integer field
@ IFT_INT16
16 bit integer field
@ IFT_ENUM
field with interface specific enum type
@ IFT_UINT8
8 bit unsigned integer field