Fawkes API Fawkes Development Version
mongodb_thread.cpp
1
2/***************************************************************************
3 * mongodb_thread.cpp - MongoDB Thread
4 *
5 * Created: Sun Dec 05 23:32:13 2010
6 * Copyright 2006-2015 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "mongodb_thread.h"
23
24#include "mongodb_client_config.h"
25#include "mongodb_instance_config.h"
26#include "mongodb_replicaset_config.h"
27
28using namespace mongocxx;
29using namespace fawkes;
30
31/** @class MongoDBThread "mongodb_thread.h"
32 * MongoDB Thread.
33 * This thread maintains an active connection to MongoDB and provides an
34 * aspect to access MongoDB to make it convenient for other threads to use
35 * MongoDB.
36 *
37 * @author Tim Niemueller
38 */
39
40/** Constructor. */
42: Thread("MongoDBThread", Thread::OPMODE_WAITFORWAKEUP),
43 AspectProviderAspect(&mongodb_aspect_inifin_),
44 mongodb_aspect_inifin_(this)
45{
46}
47
48/** Destructor. */
50{
51}
52
53void
55{
56 instance_.reset(new instance());
57 logger->log_info(name(), "Init instances");
58 init_instance_configs();
59 logger->log_info(name(), "Init clients");
60 init_client_configs();
61 logger->log_info(name(), "Init RS");
62 init_replicaset_configs();
63
64 if (client_configs_.empty() && instance_configs_.empty() && replicaset_configs_.empty()) {
65 throw Exception("No enabled MongoDB configurations found");
66 }
67}
68
69void
70MongoDBThread::init_client_configs()
71{
72 std::set<std::string> ignored_configs;
73 std::string prefix = "/plugins/mongodb/clients/";
74
75 std::unique_ptr<Configuration::ValueIterator> i(config->search(prefix.c_str()));
76 while (i->next()) {
77 std::string cfg_name = std::string(i->path()).substr(prefix.length());
78 cfg_name = cfg_name.substr(0, cfg_name.find("/"));
79
80 if ((client_configs_.find(cfg_name) == client_configs_.end())
81 && (ignored_configs.find(cfg_name) == ignored_configs.end())) {
82 std::string cfg_prefix = prefix + cfg_name + "/";
83
84 try {
85 auto conf = std::make_shared<MongoDBClientConfig>(config, logger, cfg_name, cfg_prefix);
86 if (conf->is_enabled()) {
87 client_configs_[cfg_name] = conf;
88 logger->log_info(name(), "Added MongoDB client configuration %s", cfg_name.c_str());
89 conf->log(logger, name(), " ");
90 } else {
92 "Ignoring disabled MongoDB client "
93 "configuration %s",
94 cfg_name.c_str());
95 ignored_configs.insert(cfg_name);
96 }
97 } catch (Exception &e) {
99 "Invalid MongoDB client config %s, ignoring, "
100 "exception follows.",
101 cfg_name.c_str());
102 logger->log_warn(name(), e);
103 ignored_configs.insert(cfg_name);
104 }
105 }
106 }
107}
108
109void
110MongoDBThread::init_instance_configs()
111{
112 std::set<std::string> ignored_configs;
113 std::string prefix = "/plugins/mongodb/instances/";
114
115 std::unique_ptr<Configuration::ValueIterator> i(config->search(prefix.c_str()));
116 while (i->next()) {
117 std::string cfg_name = std::string(i->path()).substr(prefix.length());
118 cfg_name = cfg_name.substr(0, cfg_name.find("/"));
119
120 if ((instance_configs_.find(cfg_name) == instance_configs_.end())
121 && (ignored_configs.find(cfg_name) == ignored_configs.end())) {
122 std::string cfg_prefix = prefix + cfg_name + "/";
123
124 try {
125 auto conf = std::make_shared<MongoDBInstanceConfig>(config, cfg_name, cfg_prefix);
126 if (conf->is_enabled()) {
127 instance_configs_[cfg_name] = conf;
128 logger->log_info(name(), "Added MongoDB instance configuration %s", cfg_name.c_str());
129 } else {
131 "Ignoring disabled MongoDB instance "
132 "configuration %s",
133 cfg_name.c_str());
134 ignored_configs.insert(cfg_name);
135 }
136 } catch (Exception &e) {
138 "Invalid MongoDB instance config %s, ignoring, "
139 "exception follows.",
140 cfg_name.c_str());
141 logger->log_warn(name(), e);
142 ignored_configs.insert(cfg_name);
143 }
144 }
145 }
146
147 for (auto c : instance_configs_) {
148 logger->log_info(name(), "Running instance '%s'", c.first.c_str());
149 logger->log_info(name(), " '%s'", c.second->command_line().c_str());
150 thread_collector->add(&*c.second);
151 }
152}
153
154void
155MongoDBThread::init_replicaset_configs()
156{
157 std::set<std::string> ignored_configs;
158 std::string prefix = "/plugins/mongodb/replica-sets/managed-sets/";
159
160 std::string bootstrap_prefix = "/plugins/mongodb/replica-sets/bootstrap-mongodb/";
161 std::string bootstrap_client_cfg = config->get_string(bootstrap_prefix + "client");
162 std::string bootstrap_database = config->get_string(bootstrap_prefix + "database");
163
164 std::unique_ptr<Configuration::ValueIterator> i(config->search(prefix.c_str()));
165 while (i->next()) {
166 std::string cfg_name = std::string(i->path()).substr(prefix.length());
167 cfg_name = cfg_name.substr(0, cfg_name.find("/"));
168
169 if ((replicaset_configs_.find(cfg_name) == replicaset_configs_.end())
170 && (ignored_configs.find(cfg_name) == ignored_configs.end())) {
171 std::string cfg_prefix = prefix + cfg_name + "/";
172
173 try {
174 if (config->get_bool(cfg_prefix + "enabled")) {
175 auto conf =
176 std::make_shared<MongoDBReplicaSetConfig>(cfg_name, cfg_prefix, bootstrap_prefix);
177 replicaset_configs_[cfg_name] = conf;
178 logger->log_info(name(), "Added MongoDB replica set configuration %s", cfg_name.c_str());
179 } else {
181 "Ignoring disabled MongoDB replica set "
182 "configuration %s",
183 cfg_name.c_str());
184 ignored_configs.insert(cfg_name);
185 }
186 } catch (Exception &e) {
188 "Invalid MongoDB replica set config %s, ignoring, "
189 "exception follows.",
190 cfg_name.c_str());
191 logger->log_warn(name(), e);
192 ignored_configs.insert(cfg_name);
193 }
194 }
195 }
196
197 for (auto c : replicaset_configs_) {
198 logger->log_info(name(), "Running replica set '%s' management", c.first.c_str());
199 thread_collector->add(&*c.second);
200 }
201}
202
203void
205{
206 for (auto c : instance_configs_) {
208 "Stopping instance '%s', grace period %u sec",
209 c.first.c_str(),
210 c.second->termination_grace_period());
211 thread_collector->remove(&*c.second);
212 }
213 instance_configs_.clear();
214
215 for (auto c : replicaset_configs_) {
216 logger->log_info(name(), "Stopping replica set '%s' management", c.first.c_str());
217 thread_collector->remove(&*c.second);
218 }
219 replicaset_configs_.clear();
220
221 client_configs_.clear();
222
223 instance_.release();
224}
225
226void
228{
229}
230
231mongocxx::client *
232MongoDBThread::create_client(const std::string &config_name)
233{
234 const std::string cname{config_name.empty() ? "default" : config_name};
235
236 if (client_configs_.find(cname) != client_configs_.end()) {
237 if (!client_configs_[cname]->is_enabled()) {
238 throw Exception("MongoDB config '%s' is not marked enabled", cname.c_str());
239 }
240 return client_configs_[cname]->create_client();
241 } else {
242 throw Exception("No MongoDB config named '%s' exists", cname.c_str());
243 }
244}
245
246void
247MongoDBThread::delete_client(mongocxx::client *client)
248{
249 delete client;
250}
MongoDBThread()
Constructor.
virtual ~MongoDBThread()
Destructor.
virtual void finalize()
Finalize the thread.
virtual mongocxx::client * create_client(const std::string &config_name="")
Create a new MongoDB client.
virtual void init()
Initialize the thread.
virtual void delete_client(mongocxx::client *client)
Delete a client.
virtual void loop()
Code to execute in the thread.
Thread aspect provide a new aspect.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
virtual void add(ThreadList &tl)=0
Add multiple threads.
virtual void remove(ThreadList &tl)=0
Remove multiple threads.
ThreadCollector * thread_collector
Thread collector.
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
Fawkes library namespace.