Fawkes API Fawkes Development Version
event_trigger_manager.cpp
1/***************************************************************************
2 * event_trigger_manager.cpp - Manager to realize triggers on events in the robot memory
3 *
4 *
5 * Created: 3:53:46 PM 2016
6 * Copyright 2016 Frederik Zwilling
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "event_trigger_manager.h"
23
24#ifdef USE_TIMETRACKER
25# include <utils/time/tracker.h>
26#endif
27#include <plugins/mongodb/utils.h>
28#include <utils/time/tracker_macros.h>
29
30#include <boost/bind/bind.hpp>
31#include <bsoncxx/json.hpp>
32#include <mongocxx/exception/operation_exception.hpp>
33#include <mongocxx/exception/query_exception.hpp>
34
35using namespace fawkes;
36using namespace mongocxx;
37
38/** @class EventTriggerManager event_trigger_manager.h
39 * Manager to realize triggers on events in the robot memory
40 * @author Frederik Zwilling
41 */
42
43/**
44 * Constructor for class managing EventTriggers
45 * @param logger Logger
46 * @param config Configuration
47 * @param mongo_connection_manager MongoDBConnCreator
48 */
50 Configuration * config,
51 MongoDBConnCreator *mongo_connection_manager)
52: cfg_debug_(false)
53{
54 logger_ = logger;
55 config_ = config;
56 mongo_connection_manager_ = mongo_connection_manager;
57
58 con_local_ = mongo_connection_manager_->create_client("robot-memory-local");
59 if (config_->exists("/plugins/mongodb/clients/robot-memory-distributed/enabled")
60 && config_->get_bool("/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
61 con_replica_ = mongo_connection_manager_->create_client("robot-memory-distributed");
62 }
63
64 // create connections to running mongod instances because only there
65 std::string local_db = config_->get_string("/plugins/robot-memory/database");
66 dbnames_local_.push_back(local_db);
67 dbnames_distributed_ = config_->get_strings("/plugins/robot-memory/distributed-db-names");
68
69 mutex_ = new Mutex();
70
71 try {
72 cfg_debug_ = config->get_bool("/plugins/robot-memory/more-debug-output");
73 } catch (...) {
74 }
75#ifdef USE_TIMETRACKER
76 tt_ = new fawkes::TimeTracker();
77 ttc_trigger_loop_ = tt_->add_class("RM Trigger Trigger Loop");
78 ttc_callback_loop_ = tt_->add_class("RM Trigger Callback Loop");
79 ttc_callback_ = tt_->add_class("RM Trigger Single Callback");
80 ttc_reinit_ = tt_->add_class("RM Trigger Reinit");
81#endif
82}
83
84EventTriggerManager::~EventTriggerManager()
85{
86 for (EventTrigger *trigger : triggers) {
87 delete trigger;
88 }
89 mongo_connection_manager_->delete_client(con_local_);
90 mongo_connection_manager_->delete_client(con_replica_);
91 delete mutex_;
92#ifdef USE_TIMETRACKER
93 delete tt_;
94#endif
95}
96
97void
98EventTriggerManager::check_events()
99{
100 //lock to be thread safe (e.g. registration during checking)
101 MutexLocker lock(mutex_);
102
103 TIMETRACK_START(ttc_trigger_loop_);
104 for (EventTrigger *trigger : triggers) {
105 bool ok = true;
106 try {
107 auto next = trigger->change_stream.begin();
108 TIMETRACK_START(ttc_callback_loop_);
109 while (next != trigger->change_stream.end()) {
110 //logger_->log_warn(name.c_str(), "Triggering: %s", bsoncxx::to_json(*next).c_str());
111 //actually call the callback function
112 TIMETRACK_START(ttc_callback_);
113 trigger->callback(*next);
114 next++;
115 TIMETRACK_END(ttc_callback_);
116 }
117 TIMETRACK_END(ttc_callback_loop_);
118 } catch (operation_exception &e) {
119 logger_->log_error(name.c_str(), "Error while reading the change stream");
120 ok = false;
121 }
122 // TODO Do we still need to check whether the cursor is dead?
123 // (with old driver: (!ok || trigger->oplog_cursor->isDead()))
124 if (!ok) {
125 TIMETRACK_START(ttc_reinit_);
126 if (cfg_debug_)
127 logger_->log_debug(name.c_str(), "Tailable Cursor is dead, requerying");
128 //check if collection is local or replicated
129 client *con;
130 if (std::find(dbnames_distributed_.begin(),
131 dbnames_distributed_.end(),
132 get_db_name(trigger->ns_db))
133 != dbnames_distributed_.end()) {
134 con = con_replica_;
135 } else {
136 con = con_local_;
137 }
138 auto db_coll_pair = split_db_collection_string(trigger->ns);
139 auto collection = con->database(db_coll_pair.first)[db_coll_pair.second];
140 try {
141 trigger->change_stream = create_change_stream(collection, trigger->filter_query.view());
142 } catch (mongocxx::query_exception &e) {
143 logger_->log_error(name.c_str(),
144 "Failed to create change stream, broken trigger for collection %s: %s",
145 trigger->ns.c_str(),
146 e.what());
147 }
148 TIMETRACK_END(ttc_reinit_);
149 }
150 }
151 TIMETRACK_END(ttc_trigger_loop_);
152#ifdef USE_TIMETRACKER
153 if (++tt_loopcount_ % 5 == 0) {
154 tt_->print_to_stdout();
155 }
156#endif
157}
158
159/**
160 * Remove a previously registered trigger
161 * @param trigger Pointer to the trigger to remove
162 */
163void
165{
166 triggers.remove(trigger);
167 delete trigger;
168}
169
170change_stream
171EventTriggerManager::create_change_stream(mongocxx::collection &coll, bsoncxx::document::view query)
172{
173 // TODO Allow non-empty pipelines
174 // @body We used to have a regular mongodb query as input to the oplog, but
175 // now this needs to be a pipeline. Adapt the change stream creation and the
176 // robot-memory API so we also accept a non-empty pipeline.
177 if (!query.empty()) {
178 throw fawkes::Exception("Non-empty queries are not implemented!");
179 }
180 mongocxx::options::change_stream opts;
181 opts.full_document("updateLookup");
182 opts.max_await_time(std::chrono::milliseconds(1));
183 auto res = coll.watch(opts);
184 // Go to end of change stream to get new updates from then on.
185 auto it = res.begin();
186 while (std::next(it) != res.end()) {}
187
188 return res;
189}
190
191/** Split database name from namespace.
192 * @param ns namespace, format db.collection
193 * @return db part of @p ns
194 */
195std::string
197{
198 std::string::size_type dot_pos = ns.find(".");
199 if (dot_pos == std::string::npos) {
200 return "";
201 } else {
202 return ns.substr(0, dot_pos);
203 }
204}
EventTriggerManager(fawkes::Logger *logger, fawkes::Configuration *config, fawkes::MongoDBConnCreator *mongo_connection_manager)
Constructor for class managing EventTriggers.
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
static std::string get_db_name(const std::string &ns)
Split database name from namespace.
Class holding all information about an EventTrigger.
Definition: event_trigger.h:32
Interface for configuration handling.
Definition: config.h:68
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual bool exists(const char *path)=0
Check if a given value exists.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
Interface for logging.
Definition: logger.h:42
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
Interface for a MongoDB connection creator.
virtual mongocxx::client * create_client(const std::string &config_name="")=0
Create a new MongoDB client.
virtual void delete_client(mongocxx::client *client)=0
Delete a client.
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
Time tracking utility.
Definition: tracker.h:37
Fawkes library namespace.