22 #include "mongodb_replicaset_config.h" 24 #include "mongodb_client_config.h" 26 #include <config/config.h> 27 #include <logging/logger.h> 28 #include <mongo/bson/bson.h> 29 #include <mongo/client/dbclient.h> 30 #include <utils/time/wait.h> 58 std::string bootstrap_database)
61 set_name(
"MongoDBReplicaSet|%s", cfgname.c_str());
62 config_name_ = cfgname;
65 ReplicaSetStatus{.member_status = MongoDBManagedReplicaSetInterface::ERROR,
66 .primary_status = MongoDBManagedReplicaSetInterface::PRIMARY_UNKNOWN};
70 enabled_ = config->
get_bool(prefix +
"enabled");
75 bootstrap_database_ = bootstrap_database;
76 bootstrap_ns_ = bootstrap_database +
"." + config_name_;
78 local_client_cfg_ = config->
get_string(prefix +
"local-client");
81 loop_interval_ = config->
get_float(prefix +
"loop-interval");
85 leader_expiration_ = 10;
87 leader_expiration_ = config->
get_int(prefix +
"leader-expiration");
94 "/plugins/mongodb/clients/" + local_client_cfg_ +
"/");
96 throw Exception(
"%s: local client configuration '%s' disabled",
98 local_client_cfg_.c_str());
101 throw Exception(
"%s: local client configuration '%s' mode is not connection",
103 local_client_cfg_.c_str());
105 local_hostport_ = client_config.
hostport();
106 std::vector<std::string> hostv = config->
get_strings(prefix +
"hosts");
107 std::copy(hostv.begin(), hostv.end(), std::inserter(hosts_, hosts_.end()));
109 if (std::find(hosts_.begin(), hosts_.end(), local_hostport_) == hosts_.end()) {
110 throw Exception(
"%s host list does not include local client",
name());
113 leader_elec_query_ = BSON(
"host" << local_hostport_ <<
"master" <<
false);
114 leader_elec_query_force_ = BSON(
"master" <<
true);
116 mongo::BSONObjBuilder update;
117 update.append(
"$currentDate", BSON(
"last_seen" <<
true));
118 mongo::BSONObjBuilder update_set;
119 update_set.append(
"master",
true);
120 update_set.append(
"host", local_hostport_);
121 update.append(
"$set", update_set.obj());
122 leader_elec_update_ = update.obj();
135 bootstrap_client_ = bootstrap_client;
136 bootstrap_client_->createCollection(bootstrap_ns_);
137 bootstrap_client_->createIndex(bootstrap_ns_, mongo::IndexSpec().addKey(
"host"));
138 bootstrap_client_->createIndex(bootstrap_ns_, mongo::IndexSpec().addKey(
"master").unique());
139 bootstrap_client_->createIndex(
140 bootstrap_ns_, mongo::IndexSpec().addKey(
"last_seen").expireAfterSeconds(leader_expiration_));
145 MongoDBReplicaSetConfig::leader_elect(
bool force)
148 bootstrap_client_->update(bootstrap_ns_,
149 force ? leader_elec_query_force_ : leader_elec_query_,
153 &mongo::WriteConcern::majority);
158 }
catch (mongo::OperationException &e) {
159 if (e.obj()[
"code"].numberInt() != 11000) {
162 "Leader election failed (%i): %s %s",
165 e.obj().jsonString().c_str());
167 }
else if (is_leader_) {
176 MongoDBReplicaSetConfig::leader_resign()
180 bootstrap_client_->remove(bootstrap_ns_,
183 &mongo::WriteConcern::majority);
191 throw Exception(
"Replica set manager '%s' cannot be started while disabled",
name());
195 logger->
log_debug(
name(),
"Bootstrap Update: %s", leader_elec_update_.jsonString().c_str());
200 timewait_ =
new TimeWait(
clock, (
int)(loop_interval_ * 1000000.));
216 mongo::BSONObj reply;
217 ReplicaSetStatus status = rs_status(reply);
219 if (status.primary_status == MongoDBManagedReplicaSetInterface::NO_PRIMARY) {
221 if (leader_elect(
false)) {
227 switch (status.member_status) {
228 case MongoDBManagedReplicaSetInterface::PRIMARY:
229 if (last_status_.member_status != status.member_status) {
235 case MongoDBManagedReplicaSetInterface::SECONDARY:
236 if (last_status_.member_status != status.member_status) {
240 case MongoDBManagedReplicaSetInterface::ARBITER:
243 case MongoDBManagedReplicaSetInterface::NOT_INITIALIZED:
244 if (hosts_.size() == 1 || leader_elect()) {
246 if (hosts_.size() == 1) {
254 case MongoDBManagedReplicaSetInterface::INVALID_CONFIG:
257 "Invalid configuration, hands-on required\n%s",
258 reply.jsonString().c_str());
263 if (last_status_ != status) {
264 rs_status_if_->set_member_status(status.member_status);
265 rs_status_if_->set_primary_status(status.primary_status);
266 rs_status_if_->set_error_msg(status.error_msg.c_str());
267 rs_status_if_->write();
269 last_status_ = status;
275 MongoDBReplicaSetConfig::ReplicaSetStatus
276 MongoDBReplicaSetConfig::rs_status(mongo::BSONObj &reply)
278 ReplicaSetStatus status = {.member_status = MongoDBManagedReplicaSetInterface::ERROR,
279 .primary_status = MongoDBManagedReplicaSetInterface::PRIMARY_UNKNOWN};
281 mongo::BSONObj cmd(BSON(
"replSetGetStatus" << 1));
283 bool ok = local_client_->runCommand(
"admin", cmd, reply);
286 if (reply[
"code"].numberInt() == mongo::ErrorCodes::NotYetInitialized) {
288 status.member_status = MongoDBManagedReplicaSetInterface::NOT_INITIALIZED;
289 status.error_msg =
"Instance has not received replica set configuration, yet";
290 }
else if (reply[
"code"].numberInt() == mongo::ErrorCodes::InvalidReplicaSetConfig) {
292 "Invalid replica set configuration: %s",
293 reply.jsonString().c_str());
294 status.member_status = MongoDBManagedReplicaSetInterface::INVALID_CONFIG;
295 status.error_msg =
"Invalid replica set configuration: " + reply.jsonString();
297 status.error_msg =
"Unknown error";
303 mongo::BSONObjIterator members(reply.getObjectField(
"members"));
304 bool have_primary =
false;
305 MongoDBManagedReplicaSetInterface::ReplicaSetMemberStatus self_status =
306 MongoDBManagedReplicaSetInterface::REMOVED;
307 while (members.more()) {
308 mongo::BSONObj m = members.next().Obj();
309 int state = m[
"state"].Int();
313 if (m.hasField(
"self") && m[
"self"].boolean()) {
315 case 1: self_status = MongoDBManagedReplicaSetInterface::PRIMARY;
break;
316 case 2: self_status = MongoDBManagedReplicaSetInterface::SECONDARY;
break;
320 self_status = MongoDBManagedReplicaSetInterface::INITIALIZING;
322 case 7: self_status = MongoDBManagedReplicaSetInterface::ARBITER;
break;
323 default: self_status = MongoDBManagedReplicaSetInterface::ERROR;
break;
327 status.primary_status = have_primary ? MongoDBManagedReplicaSetInterface::HAVE_PRIMARY
328 : MongoDBManagedReplicaSetInterface::NO_PRIMARY;
329 status.member_status = self_status;
331 }
catch (mongo::DBException &e) {
333 status.member_status = MongoDBManagedReplicaSetInterface::ERROR;
334 status.error_msg = std::string(
"Failed to analyze member info: ") + e.what();
338 }
catch (mongo::DBException &e) {
340 status.member_status = MongoDBManagedReplicaSetInterface::ERROR;
341 status.error_msg = std::string(
"Failed to get RS status: ") + e.what();
348 MongoDBReplicaSetConfig::rs_init()
352 mongo::BSONObj cmd(BSON(
"replSetInitiate" << conf));
354 mongo::BSONObj reply;
356 bool ok = local_client_->runCommand(
"admin", cmd, reply);
358 logger->
log_error(
name(),
"RS initialization failed: %s", reply[
"errmsg"].toString().c_str());
362 }
catch (mongo::DBException &e) {
368 MongoDBReplicaSetConfig::rs_get_config(mongo::BSONObj &rs_config)
370 mongo::BSONObj cmd(BSON(
"replSetGetConfig" << 1));
373 mongo::BSONObj reply;
374 bool ok = local_client_->runCommand(
"admin", cmd, reply);
376 rs_config = reply[
"config"].Obj().copy();
380 "Failed to get RS config: %s (DB error)",
381 reply[
"errmsg"].str().c_str());
384 }
catch (mongo::DBException &e) {
391 MongoDBReplicaSetConfig::rs_monitor(
const mongo::BSONObj &status_reply)
393 using namespace std::chrono_literals;
395 std::set<std::string> in_rs, unresponsive, new_alive, members;
396 int last_member_id = 0;
398 mongo::BSONObjIterator members_it(status_reply.getObjectField(
"members"));
399 while (members_it.more()) {
400 mongo::BSONObj m = members_it.next().Obj();
401 members.insert(m[
"name"].str());
403 last_member_id = std::max(m[
"_id"].numberInt(), last_member_id);
406 if (m.hasField(
"self") && m[
"self"].boolean()) {
407 in_rs.insert(m[
"name"].str());
409 std::chrono::time_point<std::chrono::high_resolution_clock> last_heartbeat_rcvd(
410 std::chrono::milliseconds(m[
"lastHeartbeatRecv"].date()));
411 auto now = std::chrono::high_resolution_clock::now();
412 if ((m[
"health"].numberInt() != 1) || (now - last_heartbeat_rcvd) > 15s) {
414 unresponsive.insert(m[
"name"].str());
416 in_rs.insert(m[
"name"].str());
421 std::set<std::string> not_member;
422 std::set_difference(hosts_.begin(),
426 std::inserter(not_member, not_member.end()));
428 for (
const std::string &h : not_member) {
430 if (check_alive(h)) {
438 if (!unresponsive.empty() || !new_alive.empty()) {
440 mongo::BSONObj rs_config;
441 if (!rs_get_config(rs_config))
444 mongo::BSONObjBuilder new_config;
445 std::set<std::string> field_names;
446 rs_config.getFieldNames(field_names);
447 for (
const std::string &fn : field_names) {
448 if (fn ==
"version") {
449 new_config.append(
"version", rs_config[
"version"].numberInt() + 1);
450 }
else if (fn ==
"members") {
451 mongo::BSONObjIterator members_it(rs_config.getObjectField(
"members"));
453 mongo::BSONArrayBuilder members_arr(new_config.subarrayStart(
"members"));
455 while (members_it.more()) {
456 mongo::BSONObj m = members_it.next().Obj();
457 std::string host = m[
"host"].str();
458 if (hosts_.find(host) == hosts_.end()) {
461 "not part of the replica set configuration",
463 }
else if (unresponsive.find(host) == unresponsive.end()) {
466 members_arr.append(m);
471 for (
const std::string &h : new_alive) {
473 mongo::BSONObjBuilder membuild;
474 membuild.append(
"_id", ++last_member_id);
475 membuild.append(
"host", h);
476 members_arr.append(membuild.obj());
478 members_arr.doneFast();
481 new_config.append(rs_config[fn]);
482 }
catch (mongo::MsgAssertionException &e) {
489 mongo::BSONObj new_config_obj(new_config.obj());
492 mongo::BSONObjBuilder cmd;
493 cmd.append(
"replSetReconfig", new_config_obj);
494 cmd.append(
"force",
true);
497 mongo::BSONObj reply;
498 bool ok = local_client_->runCommand(
"admin", cmd.obj(), reply);
501 "RS reconfig failed: %s (DB error)",
502 reply[
"errmsg"].str().c_str());
504 }
catch (mongo::DBException &e) {
511 MongoDBReplicaSetConfig::check_alive(
const std::string &h)
514 std::shared_ptr<mongo::DBClientConnection> client =
515 std::make_shared<mongo::DBClientConnection>();
517 mongo::HostAndPort hostport(h);
518 if (!client->connect(hostport, errmsg)) {
521 mongo::BSONObj cmd(BSON(
"isMaster" << 1));
522 mongo::BSONObj reply;
523 bool ok = client->runCommand(
"admin", cmd, reply);
528 }
catch (mongo::DBException &e) {
bool is_enabled() const
Check if configuration is enabled.
ConnectionMode mode() const
Get client configuration mode.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
std::string hostport() const
Get host and port of configuration.
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
Thread class encapsulation of pthreads.
virtual int get_int(const char *path)=0
Get value from configuration which is of type int.
MongoDBReplicaSetConfig(fawkes::Configuration *config, std::string cfgname, std::string prefix, std::string bootstrap_database)
Constructor.
mongo::DBClientBase * create_client()
Create MongoDB client for this configuration.
Logger * logger
This is the Logger member used to access the logger.
void wait_systime()
Wait until minimum loop time has been reached in real time.
Clock * clock
By means of this member access to the clock is given.
void set_name(const char *format,...)
Set name of thread.
Base class for exceptions in Fawkes.
virtual void loop()
Code to execute in the thread.
const char * name() const
Get name of thread.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
void bootstrap(std::shared_ptr< mongo::DBClientBase > bootstrap_client)
Setup replicaset bootstrap client.
void mark_start()
Mark start of loop.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual void init()
Initialize the thread.
Interface for configuration handling.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
virtual void finalize()
Finalize the thread.
virtual void close(Interface *interface)=0
Close interface.