Fawkes API Fawkes Development Version
mongodb_instance_config.cpp
1
2/***************************************************************************
3 * mongodb_instance_config.cpp - MongoDB instance configuration
4 *
5 * Created: Wed Jul 12 14:33:02 2017
6 * Copyright 2006-2017 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "mongodb_instance_config.h"
23
24#include "utils.h"
25
26#include <config/config.h>
27#include <core/exceptions/system.h>
28#include <utils/sub_process/proc.h>
29#include <utils/time/wait.h>
30
31#include <boost/filesystem.hpp>
32#include <boost/filesystem/operations.hpp>
33#include <boost/format.hpp>
34#include <bsoncxx/builder/basic/document.hpp>
35#include <bsoncxx/json.hpp>
36#include <chrono>
37#include <mongocxx/client.hpp>
38#include <mongocxx/exception/exception.hpp>
39#include <numeric>
40#include <sstream>
41#include <wordexp.h>
42
43using namespace fawkes;
44using namespace std::chrono_literals;
45
46/** @class MongoDBInstanceConfig "mongodb_client_config.h"
47 * MongoDB Instances Configuration.
48 * Configure a single mongod instance that can be run as a sub-process.
49 *
50 * @author Tim Niemueller
51 */
52
53/** Constructor.
54 * This will read the given configuration.
55 * @param config configuration to query
56 * @param cfgname configuration name
57 * @param prefix configuration path prefix
58 */
60 std::string cfgname,
61 std::string prefix)
62: Thread("MongoDBInstance", Thread::OPMODE_CONTINUOUS)
63{
64 set_name("MongoDBInstance|%s", cfgname.c_str());
65 config_name_ = cfgname;
66
67 running_ = false;
68
69 enabled_ = false;
70 try {
71 enabled_ = config->get_bool(prefix + "enabled");
72 } catch (Exception &e) {
73 }
74
75 if (enabled_) {
76 startup_grace_period_ = 10;
77 try {
78 startup_grace_period_ = config->get_uint(prefix + "startup-grace-period");
79 } catch (Exception &e) {
80 } // ignored, use default
81 loop_interval_ = 5.0;
82 try {
83 loop_interval_ = config->get_float(prefix + "loop-interval");
84 } catch (Exception &e) {
85 } // ignored, use default
86 termination_grace_period_ = config->get_uint(prefix + "termination-grace-period");
87 port_ = config->get_uint(prefix + "port");
88 bind_ip_ = config->get_string_or_default(std::string(prefix + "bind_ip").c_str(), "0.0.0.0");
89 use_tmp_directory_ = config->get_bool_or_default((prefix + "use-tmp-directory").c_str(), false);
90 // By default, clear data if we use a tmp directory, otherwise don't.
91 clear_data_on_termination_ =
92 config->get_bool_or_default((prefix + "clear-data-on-termination").c_str(),
93 use_tmp_directory_);
94 if (use_tmp_directory_) {
95 // Add the port to the filename, escape all literal "%" as "%%".
96 const std::string filename =
97 boost::str(boost::format("mongodb-%1%-%%%%%%%%-%%%%%%%%-%%%%%%%%") % port_);
98 const boost::filesystem::path path =
99 boost::filesystem::unique_path(boost::filesystem::temp_directory_path() / filename);
100 data_path_ = path.string();
101 log_path_ = (path / "mongodb.log").string();
102 } else {
103 data_path_ = config->get_string(prefix + "data-path");
104 log_path_ = config->get_string(prefix + "log/path");
105 }
106 log_append_ = config->get_bool(prefix + "log/append");
107 try {
108 replica_set_ = config->get_string(prefix + "replica-set");
109 ;
110 } catch (Exception &e) {
111 } // ignored, no replica set
112 if (!replica_set_.empty()) {
113 oplog_size_ = 0;
114 try {
115 oplog_size_ = config->get_uint(prefix + "oplog-size");
116 } catch (Exception &e) {
117 } // ignored, use default
118 }
119 }
120
121 argv_ = {
122 "mongod", "--bind_ip", bind_ip_, "--port", std::to_string(port_), "--dbpath", data_path_};
123
124 if (!log_path_.empty()) {
125 if (log_append_) {
126 argv_.push_back("--logappend");
127 }
128 argv_.push_back("--logpath");
129 argv_.push_back(log_path_);
130 }
131
132 if (!replica_set_.empty()) {
133 argv_.push_back("--replSet");
134 argv_.push_back(replica_set_);
135 if (oplog_size_ > 0) {
136 argv_.push_back("--oplogSize");
137 argv_.push_back(std::to_string(oplog_size_));
138 }
139 }
140
141 if (enabled_) {
142 std::string extra_args = config->get_string_or_default((prefix + "args").c_str(), "");
143 if (!extra_args.empty()) {
144 wordexp_t p;
145 int wrv = wordexp(extra_args.c_str(), &p, WRDE_NOCMD | WRDE_UNDEF);
146 switch (wrv) {
147 case 0: break; // all good
148 case WRDE_BADCHAR: throw Exception("%s: invalid character in args", name());
149 case WRDE_BADVAL: throw Exception("%s: undefined variable referenced in args", name());
150 case WRDE_CMDSUB:
151 throw Exception("%s: running sub-commands has been disabled for args", name());
152 case WRDE_NOSPACE: throw OutOfMemoryException("Cannot parse args");
153 case WRDE_SYNTAX: throw Exception("%s: shell syntax error in args", name());
154 default: throw Exception("Unexpected wordexp error %d when parsing args", wrv);
155 }
156
157 // These arguments may not be passed, they are either configured through
158 // config values and could interfere, or they mess with our rs handling.
159 std::vector<std::string> invalid_args = {"--port",
160 "--dbpath",
161 "--fork",
162 "--logappend",
163 "--logpath",
164 "--replSet",
165 "--oplogSize",
166 "--master",
167 "--slave",
168 "--source",
169 "--only"};
170
171 // pass and verify arguments to be added to command line
172 for (size_t i = 0; i < p.we_wordc; ++i) {
173 for (size_t j = 0; j < invalid_args.size(); ++j) {
174 if (invalid_args[j] == p.we_wordv[i]) {
175 wordfree(&p);
176 throw Exception("%s: %s may not be passed in args", name(), invalid_args[j].c_str());
177 }
178 }
179 argv_.push_back(p.we_wordv[i]);
180 }
181 wordfree(&p);
182 }
183 }
184
185 command_line_ = std::accumulate(std::next(argv_.begin()),
186 argv_.end(),
187 argv_.front(),
188 [](std::string &s, const std::string &a) { return s + " " + a; });
189}
190
191void
193{
194 if (enabled_) {
195 logger->log_debug(name(), "enabled: true");
196 logger->log_debug(name(), "TCP port: %u", port_);
197 logger->log_debug(name(), "Termination grace period: %u", termination_grace_period_);
199 "clear data on termination: %s",
200 clear_data_on_termination_ ? "yes" : "no");
201 logger->log_debug(name(), "data path: %s", data_path_.c_str());
202 logger->log_debug(name(), "log path: %s", log_path_.c_str());
203 logger->log_debug(name(), "log append: %s", log_append_ ? "yes" : "no");
205 "replica set: %s",
206 replica_set_.empty() ? "DISABLED" : replica_set_.c_str());
207 if (!replica_set_.empty()) {
208 logger->log_debug(name(), "Op Log Size: %u MB", oplog_size_);
209 }
210
211 start_mongod();
212 } else {
213 throw Exception("Instance '%s' cannot be started while disabled", name());
214 }
215
216 timewait_ = new TimeWait(clock, (int)(loop_interval_ * 1000000.));
217}
218
219void
221{
222 timewait_->mark_start();
223 if (!running_ || !check_alive()) {
224 logger->log_error(name(), "MongoDB dead, restarting");
225 // on a crash, clean to make sure
226 try {
227 kill_mongod(true);
228 start_mongod();
229 } catch (Exception &e) {
230 logger->log_error(name(), "Failed to start MongoDB: %s", e.what_no_backtrace());
231 }
232 }
233 timewait_->wait_systime();
234}
235
236void
238{
239 kill_mongod(clear_data_on_termination_);
240 delete timewait_;
241}
242
243/** Get command line used to execute program.
244 * @return command line to run mongod
245 */
246std::string
248{
249 return command_line_;
250}
251
252/** Get termination grace period.
253 * @return termination grace period
254 */
255unsigned int
257{
258 return termination_grace_period_;
259}
260
261bool
262MongoDBInstanceConfig::check_alive()
263{
264 try {
265 mongocxx::client client{mongocxx::uri("mongodb://localhost:" + std::to_string(port_))};
266
267 using namespace bsoncxx::builder;
268 auto cmd{basic::document{}};
269 cmd.append(basic::kvp("isMaster", 1));
270
271 auto reply = client["admin"].run_command(cmd.view());
272 bool ok = check_mongodb_ok(reply.view());
273 if (!ok) {
274 logger->log_warn(name(), "Failed to connect: %s", bsoncxx::to_json(reply.view()).c_str());
275 }
276 return ok;
277 } catch (mongocxx::exception &e) {
278 logger->log_warn(name(), "Fail: %s", e.what());
279 return false;
280 }
281}
282
283/** Start mongod. */
284void
286{
287 if (running_)
288 return;
289
290 if (check_alive()) {
291 logger->log_warn(name(), "MongoDB already running, not starting");
292 running_ = true;
293 return;
294 }
295
296 try {
297 boost::filesystem::create_directories(data_path_);
298 } catch (boost::filesystem::filesystem_error &e) {
299 throw Exception("Failed to create data path '%s' for mongod(%s): %s",
300 data_path_.c_str(),
301 config_name_.c_str(),
302 e.what());
303 }
304
305 if (!log_path_.empty()) {
306 boost::filesystem::path p(log_path_);
307 try {
308 boost::filesystem::create_directories(p.parent_path());
309 } catch (boost::filesystem::filesystem_error &e) {
310 throw Exception("Failed to create log path '%s' for mongod(%s): %s",
311 p.parent_path().string().c_str(),
312 config_name_.c_str(),
313 e.what());
314 }
315 }
316
317 std::string progname = "mongod(" + config_name_ + ")";
318 proc_ =
319 std::make_shared<SubProcess>(progname, "mongod", argv_, std::vector<std::string>{}, logger);
320
321 for (unsigned i = 0; i < startup_grace_period_ * 4; ++i) {
322 if (check_alive()) {
323 running_ = true;
324 return;
325 }
326 std::this_thread::sleep_for(250ms);
327 }
328 if (!running_) {
329 proc_.reset();
330 throw Exception("%s: instance did not start in time", name());
331 }
332}
333
334/** Stop mongod.
335 * This send a SIGINT and then wait for the configured grace period
336 * before sending the TERM signal.
337 * @param clear_data true to clear data, false otherwise
338 */
339void
341{
342 if (proc_) {
343 proc_->kill(SIGINT);
344 for (unsigned i = 0; i < termination_grace_period_; ++i) {
345 if (!proc_->alive())
346 break;
347 std::this_thread::sleep_for(1s);
348 }
349 // This will send the term signal
350 proc_.reset();
351 running_ = false;
352 if (clear_data) {
353 try {
354 boost::filesystem::remove_all(data_path_);
355 } catch (boost::filesystem::filesystem_error &e) {
356 throw Exception("Failed to create data path '%s' for mongod(%s): %s",
357 data_path_.c_str(),
358 config_name_.c_str(),
359 e.what());
360 }
361 }
362 }
363}
unsigned int termination_grace_period() const
Get termination grace period.
virtual void loop()
Code to execute in the thread.
virtual void finalize()
Finalize the thread.
void start_mongod()
Start mongod.
void kill_mongod(bool clear_data)
Stop mongod.
MongoDBInstanceConfig(fawkes::Configuration *config, std::string cfgname, std::string prefix)
Constructor.
virtual void init()
Initialize the thread.
std::string command_line() const
Get command line used to execute program.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Interface for configuration handling.
Definition: config.h:68
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
Definition: config.cpp:736
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual bool get_bool_or_default(const char *path, const bool &default_val)
Get value from configuration which is of type bool, or the given default if the path does not exist.
Definition: config.cpp:726
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual const char * what_no_backtrace() const noexcept
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:663
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
Time wait utility.
Definition: wait.h:33
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
void wait_systime()
Wait until minimum loop time has been reached in real time.
Definition: wait.cpp:96
Fawkes library namespace.