Fawkes API  Fawkes Development Version
mongodb_replicaset_config.cpp
1 
2 /***************************************************************************
3  * mongodb_replicaset_config.cpp - MongoDB replica set configuration
4  *
5  * Created: Thu Jul 13 10:25:19 2017
6  * Copyright 2006-2018 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mongodb_replicaset_config.h"
23 
24 #include "mongodb_client_config.h"
25 
26 #include <config/config.h>
27 #include <logging/logger.h>
28 #include <mongo/bson/bson.h>
29 #include <mongo/client/dbclient.h>
30 #include <utils/time/wait.h>
31 
32 #include <chrono>
33 #include <iterator>
34 #include <numeric>
35 
36 using namespace fawkes;
37 
38 /** @class MongoDBReplicaSetConfig "mongodb_replicaset_config.h"
39  * MongoDB replica set configuration.
40  * Configure a replica set. This only takes care of initializing and
41  * maintaining the configuration of a running replica set. The
42  * mongod instances have to be handled separately, for example using
43  * instance configurations.
44  *
45  * @author Tim Niemueller
46  */
47 
48 /** Constructor.
49  * This will read the given configuration.
50  * @param config configuration to query
51  * @param cfgname configuration name
52  * @param prefix configuration path prefix
53  * @param bootstrap_database database to hold leader election data
54  */
56  std::string cfgname,
57  std::string prefix,
58  std::string bootstrap_database)
59 : Thread("MongoDBReplicaSet", Thread::OPMODE_CONTINUOUS)
60 {
61  set_name("MongoDBReplicaSet|%s", cfgname.c_str());
62  config_name_ = cfgname;
63  is_leader_ = false;
64  last_status_ =
65  ReplicaSetStatus{.member_status = MongoDBManagedReplicaSetInterface::ERROR,
66  .primary_status = MongoDBManagedReplicaSetInterface::PRIMARY_UNKNOWN};
67 
68  enabled_ = false;
69  try {
70  enabled_ = config->get_bool(prefix + "enabled");
71  } catch (Exception &e) {
72  }
73 
74  if (enabled_) {
75  bootstrap_database_ = bootstrap_database;
76  bootstrap_ns_ = bootstrap_database + "." + config_name_;
77 
78  local_client_cfg_ = config->get_string(prefix + "local-client");
79  loop_interval_ = 5.0;
80  try {
81  loop_interval_ = config->get_float(prefix + "loop-interval");
82  } catch (Exception &e) {
83  } // ignored, use default
84 
85  leader_expiration_ = 10;
86  try {
87  leader_expiration_ = config->get_int(prefix + "leader-expiration");
88  } catch (Exception &e) {
89  } // ignored, use default
90 
91  MongoDBClientConfig client_config(config,
92  logger,
93  local_client_cfg_,
94  "/plugins/mongodb/clients/" + local_client_cfg_ + "/");
95  if (!client_config.is_enabled()) {
96  throw Exception("%s: local client configuration '%s' disabled",
97  name(),
98  local_client_cfg_.c_str());
99  }
100  if (client_config.mode() != MongoDBClientConfig::CONNECTION) {
101  throw Exception("%s: local client configuration '%s' mode is not connection",
102  name(),
103  local_client_cfg_.c_str());
104  }
105  local_hostport_ = client_config.hostport();
106  std::vector<std::string> hostv = config->get_strings(prefix + "hosts");
107  std::copy(hostv.begin(), hostv.end(), std::inserter(hosts_, hosts_.end()));
108 
109  if (std::find(hosts_.begin(), hosts_.end(), local_hostport_) == hosts_.end()) {
110  throw Exception("%s host list does not include local client", name());
111  }
112 
113  leader_elec_query_ = BSON("host" << local_hostport_ << "master" << false);
114  leader_elec_query_force_ = BSON("master" << true);
115 
116  mongo::BSONObjBuilder update;
117  update.append("$currentDate", BSON("last_seen" << true));
118  mongo::BSONObjBuilder update_set;
119  update_set.append("master", true);
120  update_set.append("host", local_hostport_);
121  update.append("$set", update_set.obj());
122  leader_elec_update_ = update.obj();
123 
124  local_client_.reset(client_config.create_client());
125  }
126 }
127 
128 /** Setup replicaset bootstrap client.
129  * @param bootstrap_client MongoDB client to access bootstrap database
130  */
131 void
132 MongoDBReplicaSetConfig::bootstrap(std::shared_ptr<mongo::DBClientBase> bootstrap_client)
133 {
134  if (enabled_) {
135  bootstrap_client_ = bootstrap_client;
136  bootstrap_client_->createCollection(bootstrap_ns_);
137  bootstrap_client_->createIndex(bootstrap_ns_, mongo::IndexSpec().addKey("host"));
138  bootstrap_client_->createIndex(bootstrap_ns_, mongo::IndexSpec().addKey("master").unique());
139  bootstrap_client_->createIndex(
140  bootstrap_ns_, mongo::IndexSpec().addKey("last_seen").expireAfterSeconds(leader_expiration_));
141  }
142 }
143 
144 bool
145 MongoDBReplicaSetConfig::leader_elect(bool force)
146 {
147  try {
148  bootstrap_client_->update(bootstrap_ns_,
149  force ? leader_elec_query_force_ : leader_elec_query_,
150  leader_elec_update_,
151  /* upsert */ true,
152  /* multi */ false,
153  &mongo::WriteConcern::majority);
154  if (!is_leader_) {
155  is_leader_ = true;
156  logger->log_info(name(), "Became replica set leader");
157  }
158  } catch (mongo::OperationException &e) {
159  if (e.obj()["code"].numberInt() != 11000) {
160  // 11000: Duplicate key exception, occurs if we do not become leader, all fine
161  logger->log_error(name(),
162  "Leader election failed (%i): %s %s",
163  e.getCode(),
164  e.what(),
165  e.obj().jsonString().c_str());
166  is_leader_ = false;
167  } else if (is_leader_) {
168  logger->log_warn(name(), "Lost replica set leadership");
169  is_leader_ = false;
170  }
171  }
172  return is_leader_;
173 }
174 
175 void
176 MongoDBReplicaSetConfig::leader_resign()
177 {
178  if (is_leader_) {
179  logger->log_info(name(), "Resigning replica set leadership");
180  bootstrap_client_->remove(bootstrap_ns_,
181  leader_elec_query_,
182  /* just one */ true,
183  &mongo::WriteConcern::majority);
184  }
185 }
186 
187 void
189 {
190  if (!enabled_) {
191  throw Exception("Replica set manager '%s' cannot be started while disabled", name());
192  }
193 
194  logger->log_debug(name(), "Bootstrap Query: %s", leader_elec_query_.jsonString().c_str());
195  logger->log_debug(name(), "Bootstrap Update: %s", leader_elec_update_.jsonString().c_str());
196 
197  rs_status_if_ =
198  blackboard->open_for_writing<MongoDBManagedReplicaSetInterface>(config_name_.c_str());
199 
200  timewait_ = new TimeWait(clock, (int)(loop_interval_ * 1000000.));
201 }
202 
203 void
205 {
206  leader_resign();
207  blackboard->close(rs_status_if_);
208 
209  delete timewait_;
210 }
211 
212 void
214 {
215  timewait_->mark_start();
216  mongo::BSONObj reply;
217  ReplicaSetStatus status = rs_status(reply);
218 
219  if (status.primary_status == MongoDBManagedReplicaSetInterface::NO_PRIMARY) {
220  logger->log_warn(name(), "No primary, triggering leader election");
221  if (leader_elect(/* force leadership */ false)) {
222  logger->log_info(name(), "No primary, we became leader, managing");
223  rs_monitor(reply);
224  }
225  }
226 
227  switch (status.member_status) {
228  case MongoDBManagedReplicaSetInterface::PRIMARY:
229  if (last_status_.member_status != status.member_status) {
230  logger->log_info(name(), "Became PRIMARY, starting managing");
231  }
232  leader_elect(/* force leaderhsip */ true);
233  rs_monitor(reply);
234  break;
235  case MongoDBManagedReplicaSetInterface::SECONDARY:
236  if (last_status_.member_status != status.member_status) {
237  logger->log_info(name(), "Became SECONDARY");
238  }
239  break;
240  case MongoDBManagedReplicaSetInterface::ARBITER:
241  //logger->log_info(name(), "Arbiter");
242  break;
243  case MongoDBManagedReplicaSetInterface::NOT_INITIALIZED:
244  if (hosts_.size() == 1 || leader_elect()) {
245  // we are alone or leader, initialize replica set
246  if (hosts_.size() == 1) {
247  logger->log_info(name(), "Now initializing RS (alone)");
248  } else {
249  logger->log_info(name(), "Now initializing RS (leader)");
250  }
251  rs_init();
252  }
253  break;
254  case MongoDBManagedReplicaSetInterface::INVALID_CONFIG:
255  // we might later want to cover some typical cases
256  logger->log_error(name(),
257  "Invalid configuration, hands-on required\n%s",
258  reply.jsonString().c_str());
259  break;
260  default: break;
261  }
262 
263  if (last_status_ != status) {
264  rs_status_if_->set_member_status(status.member_status);
265  rs_status_if_->set_primary_status(status.primary_status);
266  rs_status_if_->set_error_msg(status.error_msg.c_str());
267  rs_status_if_->write();
268 
269  last_status_ = status;
270  }
271 
272  timewait_->wait_systime();
273 }
274 
275 MongoDBReplicaSetConfig::ReplicaSetStatus
276 MongoDBReplicaSetConfig::rs_status(mongo::BSONObj &reply)
277 {
278  ReplicaSetStatus status = {.member_status = MongoDBManagedReplicaSetInterface::ERROR,
279  .primary_status = MongoDBManagedReplicaSetInterface::PRIMARY_UNKNOWN};
280 
281  mongo::BSONObj cmd(BSON("replSetGetStatus" << 1));
282  try {
283  bool ok = local_client_->runCommand("admin", cmd, reply);
284 
285  if (!ok) {
286  if (reply["code"].numberInt() == mongo::ErrorCodes::NotYetInitialized) {
287  logger->log_warn(name(), "Instance has not received replica set configuration, yet");
288  status.member_status = MongoDBManagedReplicaSetInterface::NOT_INITIALIZED;
289  status.error_msg = "Instance has not received replica set configuration, yet";
290  } else if (reply["code"].numberInt() == mongo::ErrorCodes::InvalidReplicaSetConfig) {
291  logger->log_error(name(),
292  "Invalid replica set configuration: %s",
293  reply.jsonString().c_str());
294  status.member_status = MongoDBManagedReplicaSetInterface::INVALID_CONFIG;
295  status.error_msg = "Invalid replica set configuration: " + reply.jsonString();
296  } else {
297  status.error_msg = "Unknown error";
298  }
299  return status;
300  } else {
301  //logger->log_warn(name(), "rs status reply: %s", reply.jsonString().c_str());
302  try {
303  mongo::BSONObjIterator members(reply.getObjectField("members"));
304  bool have_primary = false;
305  MongoDBManagedReplicaSetInterface::ReplicaSetMemberStatus self_status =
306  MongoDBManagedReplicaSetInterface::REMOVED;
307  while (members.more()) {
308  mongo::BSONObj m = members.next().Obj();
309  int state = m["state"].Int();
310  if (state == 1)
311  have_primary = true;
312 
313  if (m.hasField("self") && m["self"].boolean()) {
314  switch (state) {
315  case 1: self_status = MongoDBManagedReplicaSetInterface::PRIMARY; break;
316  case 2: self_status = MongoDBManagedReplicaSetInterface::SECONDARY; break;
317  case 3: // RECOVERING
318  case 5: // STARTUP2
319  case 9: // ROLLBACK
320  self_status = MongoDBManagedReplicaSetInterface::INITIALIZING;
321  break;
322  case 7: self_status = MongoDBManagedReplicaSetInterface::ARBITER; break;
323  default: self_status = MongoDBManagedReplicaSetInterface::ERROR; break;
324  }
325  }
326  }
327  status.primary_status = have_primary ? MongoDBManagedReplicaSetInterface::HAVE_PRIMARY
328  : MongoDBManagedReplicaSetInterface::NO_PRIMARY;
329  status.member_status = self_status;
330  return status;
331  } catch (mongo::DBException &e) {
332  logger->log_warn(name(), "Failed to analyze member info: %s", e.what());
333  status.member_status = MongoDBManagedReplicaSetInterface::ERROR;
334  status.error_msg = std::string("Failed to analyze member info: ") + e.what();
335  return status;
336  }
337  }
338  } catch (mongo::DBException &e) {
339  logger->log_warn(name(), "Failed to get RS status: %s", e.what());
340  status.member_status = MongoDBManagedReplicaSetInterface::ERROR;
341  status.error_msg = std::string("Failed to get RS status: ") + e.what();
342  return status;
343  }
344  return status;
345 }
346 
347 void
348 MongoDBReplicaSetConfig::rs_init()
349 {
350  // using default configuration, this will just add ourself
351  mongo::BSONObj conf;
352  mongo::BSONObj cmd(BSON("replSetInitiate" << conf));
353 
354  mongo::BSONObj reply;
355  try {
356  bool ok = local_client_->runCommand("admin", cmd, reply);
357  if (!ok) {
358  logger->log_error(name(), "RS initialization failed: %s", reply["errmsg"].toString().c_str());
359  } else {
360  logger->log_debug(name(), "RS initialized successfully: %s", reply.jsonString().c_str());
361  }
362  } catch (mongo::DBException &e) {
363  logger->log_error(name(), "RS initialization failed: %s", e.what());
364  }
365 }
366 
367 bool
368 MongoDBReplicaSetConfig::rs_get_config(mongo::BSONObj &rs_config)
369 {
370  mongo::BSONObj cmd(BSON("replSetGetConfig" << 1));
371 
372  try {
373  mongo::BSONObj reply;
374  bool ok = local_client_->runCommand("admin", cmd, reply);
375  if (ok) {
376  rs_config = reply["config"].Obj().copy();
377  //logger->log_info(name(), "Config: %s", rs_config.jsonString(mongo::Strict, true).c_str());
378  } else {
379  logger->log_warn(name(),
380  "Failed to get RS config: %s (DB error)",
381  reply["errmsg"].str().c_str());
382  }
383  return ok;
384  } catch (mongo::DBException &e) {
385  logger->log_warn(name(), "Failed to get RS config: %s", e.what());
386  return false;
387  }
388 }
389 
390 void
391 MongoDBReplicaSetConfig::rs_monitor(const mongo::BSONObj &status_reply)
392 {
393  using namespace std::chrono_literals;
394 
395  std::set<std::string> in_rs, unresponsive, new_alive, members;
396  int last_member_id = 0;
397 
398  mongo::BSONObjIterator members_it(status_reply.getObjectField("members"));
399  while (members_it.more()) {
400  mongo::BSONObj m = members_it.next().Obj();
401  members.insert(m["name"].str());
402 
403  last_member_id = std::max(m["_id"].numberInt(), last_member_id);
404 
405  // determine members to remove
406  if (m.hasField("self") && m["self"].boolean()) {
407  in_rs.insert(m["name"].str());
408  } else {
409  std::chrono::time_point<std::chrono::high_resolution_clock> last_heartbeat_rcvd(
410  std::chrono::milliseconds(m["lastHeartbeatRecv"].date()));
411  auto now = std::chrono::high_resolution_clock::now();
412  if ((m["health"].numberInt() != 1) || (now - last_heartbeat_rcvd) > 15s) {
413  //logger->log_info(name(), "Reply: %s", status_reply.jsonString(mongo::Strict, true).c_str());
414  unresponsive.insert(m["name"].str());
415  } else {
416  in_rs.insert(m["name"].str());
417  }
418  }
419  }
420 
421  std::set<std::string> not_member;
422  std::set_difference(hosts_.begin(),
423  hosts_.end(),
424  in_rs.begin(),
425  in_rs.end(),
426  std::inserter(not_member, not_member.end()));
427 
428  for (const std::string &h : not_member) {
429  // Check if this host became alive, and add if it did
430  if (check_alive(h)) {
431  logger->log_info(name(), "Host %s alive, adding to RS", h.c_str());
432  new_alive.insert(h);
433  //} else {
434  //logger->log_info(name(), "Potential member %s not responding", h.c_str());
435  }
436  }
437 
438  if (!unresponsive.empty() || !new_alive.empty()) {
439  // generate new config
440  mongo::BSONObj rs_config;
441  if (!rs_get_config(rs_config))
442  return;
443 
444  mongo::BSONObjBuilder new_config;
445  std::set<std::string> field_names;
446  rs_config.getFieldNames(field_names);
447  for (const std::string &fn : field_names) {
448  if (fn == "version") {
449  new_config.append("version", rs_config["version"].numberInt() + 1);
450  } else if (fn == "members") {
451  mongo::BSONObjIterator members_it(rs_config.getObjectField("members"));
452 
453  mongo::BSONArrayBuilder members_arr(new_config.subarrayStart("members"));
454 
455  while (members_it.more()) {
456  mongo::BSONObj m = members_it.next().Obj();
457  std::string host = m["host"].str();
458  if (hosts_.find(host) == hosts_.end()) {
459  logger->log_warn(name(),
460  "Removing '%s', "
461  "not part of the replica set configuration",
462  host.c_str());
463  } else if (unresponsive.find(host) == unresponsive.end()) {
464  // it's not unresponsive, add
465  logger->log_warn(name(), "Keeping RS member '%s'", host.c_str());
466  members_arr.append(m);
467  } else {
468  logger->log_warn(name(), "Removing RS member '%s'", host.c_str());
469  }
470  }
471  for (const std::string &h : new_alive) {
472  logger->log_info(name(), "Adding new RS member '%s'", h.c_str());
473  mongo::BSONObjBuilder membuild;
474  membuild.append("_id", ++last_member_id);
475  membuild.append("host", h);
476  members_arr.append(membuild.obj());
477  }
478  members_arr.doneFast();
479  } else {
480  try {
481  new_config.append(rs_config[fn]);
482  } catch (mongo::MsgAssertionException &e) {
483  logger->log_error(name(), "ERROR on RS reconfigure (%s): %s", fn.c_str(), e.what());
484  return;
485  }
486  }
487  }
488 
489  mongo::BSONObj new_config_obj(new_config.obj());
490  //logger->log_info(name(), "Reconfigure: %s", new_config_obj.jsonString(mongo::Strict, true).c_str());
491 
492  mongo::BSONObjBuilder cmd;
493  cmd.append("replSetReconfig", new_config_obj);
494  cmd.append("force", true);
495 
496  try {
497  mongo::BSONObj reply;
498  bool ok = local_client_->runCommand("admin", cmd.obj(), reply);
499  if (!ok) {
500  logger->log_error(name(),
501  "RS reconfig failed: %s (DB error)",
502  reply["errmsg"].str().c_str());
503  }
504  } catch (mongo::DBException &e) {
505  logger->log_warn(name(), "RS reconfig failed: %s (exception)", e.what());
506  }
507  }
508 }
509 
510 bool
511 MongoDBReplicaSetConfig::check_alive(const std::string &h)
512 {
513  try {
514  std::shared_ptr<mongo::DBClientConnection> client =
515  std::make_shared<mongo::DBClientConnection>();
516  std::string errmsg;
517  mongo::HostAndPort hostport(h);
518  if (!client->connect(hostport, errmsg)) {
519  return false;
520  }
521  mongo::BSONObj cmd(BSON("isMaster" << 1));
522  mongo::BSONObj reply;
523  bool ok = client->runCommand("admin", cmd, reply);
524  if (!ok) {
525  logger->log_warn(name(), "Failed to connect: %s", reply.jsonString().c_str());
526  }
527  return ok;
528  } catch (mongo::DBException &e) {
529  logger->log_warn(name(), "Fail: %s", e.what());
530  return false;
531  }
532 }
bool is_enabled() const
Check if configuration is enabled.
ConnectionMode mode() const
Get client configuration mode.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
std::string hostport() const
Get host and port of configuration.
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
Client configuration.
Thread class encapsulation of pthreads.
Definition: thread.h:45
virtual int get_int(const char *path)=0
Get value from configuration which is of type int.
MongoDBReplicaSetConfig(fawkes::Configuration *config, std::string cfgname, std::string prefix, std::string bootstrap_database)
Constructor.
mongo::DBClientBase * create_client()
Create MongoDB client for this configuration.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
void wait_systime()
Wait until minimum loop time has been reached in real time.
Definition: wait.cpp:96
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
Base class for exceptions in Fawkes.
Definition: exception.h:35
virtual void loop()
Code to execute in the thread.
const char * name() const
Get name of thread.
Definition: thread.h:100
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
void bootstrap(std::shared_ptr< mongo::DBClientBase > bootstrap_client)
Setup replicaset bootstrap client.
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual void init()
Initialize the thread.
Interface for configuration handling.
Definition: config.h:64
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Time wait utility.
Definition: wait.h:32
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void finalize()
Finalize the thread.
virtual void close(Interface *interface)=0
Close interface.