Fawkes API  Fawkes Development Version
robot_memory.cpp
1 /***************************************************************************
2  * robot_memory.cpp - Class for storing and querying information in the RobotMemory
3  *
4  * Created: Aug 23, 2016 1:34:32 PM 2016
5  * Copyright 2016 Frederik Zwilling
6  * 2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "robot_memory.h"
23 
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/RobotMemoryInterface.h>
27 #include <utils/misc/string_conversions.h>
28 #include <utils/misc/string_split.h>
29 #include <utils/system/hostinfo.h>
30 
31 #include <chrono>
32 #include <string>
33 #include <thread>
34 
35 // from MongoDB
36 #include <mongo/client/dbclient.h>
37 
38 using namespace mongo;
39 using namespace fawkes;
40 
41 /** @class RobotMemory "robot_memory.h"
42  * Access to the robot memory based on mongodb.
43  * Using this class, you can query/insert/remove/update information in
44  * the robot memory. Furthermore, you can register trigger to get
45  * notified when something was changed in the robot memory matching
46  * your query and you can access computables, which are on demand
47  * computed information, by registering the computables and then
48  * querying as if the information would already be in the database.
49  * @author Frederik Zwilling
50  */
51 
52 /**
53  * Robot Memory Constructor with objects of the thread
54  * @param config Fawkes config
55  * @param logger Fawkes logger
56  * @param clock Fawkes clock
57  * @param mongo_connection_manager MongoDBConnCreator to create client connections to the shared and local db
58  * @param blackboard Fawkes blackboard
59  */
61  fawkes::Logger * logger,
62  fawkes::Clock * clock,
63  fawkes::MongoDBConnCreator *mongo_connection_manager,
64  fawkes::BlackBoard * blackboard)
65 {
66  config_ = config;
67  logger_ = logger;
68  clock_ = clock;
69  mongo_connection_manager_ = mongo_connection_manager;
70  blackboard_ = blackboard;
71  mongodb_client_local_ = nullptr;
72  mongodb_client_distributed_ = nullptr;
73  debug_ = false;
74 }
75 
76 RobotMemory::~RobotMemory()
77 {
78  mongo_connection_manager_->delete_client(mongodb_client_local_);
79  mongo_connection_manager_->delete_client(mongodb_client_distributed_);
80  delete trigger_manager_;
81  blackboard_->close(rm_if_);
82 }
83 
84 void
85 RobotMemory::init()
86 {
87  //load config values
88  log("Started RobotMemory");
89  default_collection_ = "robmem.test";
90  try {
91  default_collection_ = config_->get_string("/plugins/robot-memory/default-collection");
92  } catch (Exception &e) {
93  }
94  try {
95  debug_ = config_->get_bool("/plugins/robot-memory/more-debug-output");
96  } catch (Exception &e) {
97  }
98  database_name_ = "robmem";
99  try {
100  database_name_ = config_->get_string("/plugins/robot-memory/database");
101  } catch (Exception &e) {
102  }
103  distributed_dbs_ = config_->get_strings("/plugins/robot-memory/distributed-db-names");
104  cfg_startup_grace_period_ = 10;
105  try {
106  cfg_startup_grace_period_ = config_->get_uint("/plugins/robot-memory/startup-grace-period");
107  } catch (Exception &e) {
108  } // ignored, use default
109 
110  cfg_coord_database_ = config_->get_string("/plugins/robot-memory/coordination/database");
111  cfg_coord_mutex_collection_ =
112  config_->get_string("/plugins/robot-memory/coordination/mutex-collection");
113  cfg_coord_mutex_collection_ = cfg_coord_database_ + "." + cfg_coord_mutex_collection_;
114 
115  using namespace std::chrono_literals;
116 
117  //initiate mongodb connections:
118  log("Connect to local mongod");
119  unsigned int startup_tries = 0;
120  for (; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
121  try {
122  mongodb_client_local_ = mongo_connection_manager_->create_client("robot-memory-local");
123  break;
124  } catch (fawkes::Exception &e) {
125  logger_->log_info(name_, "Waiting for local");
126  std::this_thread::sleep_for(500ms);
127  }
128  }
129 
130  if (config_->exists("/plugins/mongodb/clients/robot-memory-distributed/enabled")
131  && config_->get_bool("/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
132  distributed_ = true;
133  log("Connect to distributed mongod");
134  for (startup_tries = 0; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
135  try {
136  mongodb_client_distributed_ =
137  mongo_connection_manager_->create_client("robot-memory-distributed");
138  break;
139  } catch (fawkes::Exception &e) {
140  logger_->log_info(name_, "Waiting for distributed");
141  std::this_thread::sleep_for(500ms);
142  }
143  }
144  }
145 
146  //init blackboard interface
147  rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
148  config_->get_string("/plugins/robot-memory/interface-name").c_str());
149  rm_if_->set_error("");
150  rm_if_->set_result("");
151  rm_if_->write();
152 
153  //Setup event trigger and computables manager
154  trigger_manager_ = new EventTriggerManager(logger_, config_, mongo_connection_manager_);
155  computables_manager_ = new ComputablesManager(config_, this);
156 
157  log_deb("Initialized RobotMemory");
158 }
159 
160 void
161 RobotMemory::loop()
162 {
163  trigger_manager_->check_events();
164  computables_manager_->cleanup_computed_docs();
165 }
166 
167 /**
168  * Query information from the robot memory.
169  * @param query The query returned documents have to match (essentially a BSONObj)
170  * @param collection The database and collection to query as string (e.g. robmem.worldmodel)
171  * @return Cursor to get the documents from, NULL for invalid query
172  */
173 QResCursor
174 RobotMemory::query(Query query, const std::string &collection)
175 {
176  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
177  log_deb(std::string("Executing Query " + query.toString() + " on collection " + collection));
178 
179  //check if computation on demand is necessary and execute Computables
180  computables_manager_->check_and_compute(query, collection);
181 
182  //lock (mongo_client not thread safe)
183  MutexLocker lock(mutex_);
184 
185  //set read preference of query to nearest to read from the local replica set member first
186  query.readPref(ReadPreference_Nearest, BSONArray());
187 
188  //actually execute query
189  QResCursor cursor;
190  try {
191  cursor = mongodb_client->query(collection, query);
192  if (cursor == 0) {
193  logger_->log_error(name_, "Connection failed %s", collection.c_str());
194  }
195  } catch (DBException &e) {
196  std::string error =
197  std::string("Error for query ") + query.toString() + "\n Exception: " + e.toString();
198  log(error, "error");
199  return NULL;
200  }
201  return cursor;
202 }
203 
204 /**
205  * Aggregation call on the robot memory.
206  * @param pipeline Series of commands defining the aggregation
207  * @param collection The database and collection to query as string (e.g. robmem.worldmodel)
208  * @return Result object
209  */
210 mongo::BSONObj
211 RobotMemory::aggregate(const std::vector<mongo::BSONObj> &pipeline, const std::string &collection)
212 {
213  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
214  log_deb(std::string("Executing Aggregation on collection " + collection));
215 
216  //TODO: check if computation on demand is necessary and execute Computables
217  // that might be complicated because you need to build a query to check against from the fields mentioned in the different parts of the pipeline
218  // A possible solution might be forcing the user to define the $match oject seperately and using it as query to check computables
219 
220  //lock (mongo_client not thread safe)
221  MutexLocker lock(mutex_);
222 
223  //actually execute aggregation as command (in more modern mongo-cxx versions there should be an easier way with a proper aggregate function)
224  BSONObj res;
225  //get db and collection name
226  size_t point_pos = collection.find(".");
227  if (point_pos == std::string::npos) {
228  logger_->log_error(name_, "Collection %s needs to start with 'dbname.'", collection.c_str());
229  return fromjson("{}");
230  }
231  std::string db = collection.substr(0, point_pos);
232  std::string col = collection.substr(point_pos + 1);
233  try {
234  mongodb_client->runCommand(db, BSON("aggregate" << col << "pipeline" << pipeline), res);
235  } catch (DBException &e) {
236  std::string error = std::string("Error for aggregation ") + "\n Exception: " + e.toString();
237  log(error, "error");
238  return fromjson("{}");
239  }
240  return res;
241 }
242 
243 /**
244  * Inserts a document into the robot memory
245  * @param obj The document as BSONObj
246  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
247  * @return 1: Success 0: Error
248  */
249 int
250 RobotMemory::insert(mongo::BSONObj obj, const std::string &collection)
251 {
252  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
253 
254  log_deb(std::string("Inserting " + obj.toString() + " into collection " + collection));
255 
256  //lock (mongo_client not thread safe)
257  MutexLocker lock(mutex_);
258 
259  //actually execute insert
260  try {
261  mongodb_client->insert(collection, obj);
262  } catch (DBException &e) {
263  std::string error = "Error for insert " + obj.toString() + "\n Exception: " + e.toString();
264  log_deb(error, "error");
265  return 0;
266  }
267  //return success
268  return 1;
269 }
270 
271 /** Create an index on a collection.
272  * @param obj The keys document as BSONObj
273  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
274  * @param unique true to create unique index
275  * @return 1: Success 0: Error
276  */
277 int
278 RobotMemory::create_index(mongo::BSONObj obj, const std::string &collection, bool unique)
279 {
280  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
281 
282  log_deb(std::string("Creating index " + obj.toString() + " on collection " + collection));
283 
284  //lock (mongo_client not thread safe)
285  MutexLocker lock(mutex_);
286 
287  //actually execute insert
288  try {
289  mongodb_client->createIndex(collection, mongo::IndexSpec().addKeys(obj).unique(unique));
290  } catch (DBException &e) {
291  std::string error =
292  "Error when creating index " + obj.toString() + "\n Exception: " + e.toString();
293  log_deb(error, "error");
294  return 0;
295  }
296  //return success
297  return 1;
298 }
299 
300 /**
301  * Inserts all document of a vector into the robot memory
302  * @param v_obj The vector of BSONObj document
303  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
304  * @return 1: Success 0: Error
305  */
306 int
307 RobotMemory::insert(std::vector<mongo::BSONObj> v_obj, const std::string &collection)
308 {
309  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
310 
311  std::string insert_string = "[";
312  for (BSONObj obj : v_obj) {
313  insert_string += obj.toString() + ",\n";
314  }
315  insert_string += "]";
316 
317  log_deb(std::string("Inserting vector of documents " + insert_string + " into collection "
318  + collection));
319 
320  //lock (mongo_client not thread safe)
321  MutexLocker lock(mutex_);
322 
323  //actually execute insert
324  try {
325  mongodb_client->insert(collection, v_obj);
326  } catch (DBException &e) {
327  std::string error = "Error for insert " + insert_string + "\n Exception: " + e.toString();
328  log_deb(error, "error");
329  return 0;
330  }
331  //return success
332  return 1;
333 }
334 
335 /**
336  * Inserts a document into the robot memory
337  * @param obj_str The document as json string
338  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
339  * @return 1: Success 0: Error
340  */
341 int
342 RobotMemory::insert(const std::string &obj_str, const std::string &collection)
343 {
344  return insert(fromjson(obj_str), collection);
345 }
346 
347 /**
348  * Updates documents in the robot memory
349  * @param query The query defining which documents to update
350  * @param update What to change in these documents
351  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
352  * @param upsert Should the update document be inserted if the query returns no documents?
353  * @return 1: Success 0: Error
354  */
355 int
357  mongo::BSONObj update,
358  const std::string &collection,
359  bool upsert)
360 {
361  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
362  log_deb(std::string("Executing Update " + update.toString() + " for query " + query.toString()
363  + " on collection " + collection));
364 
365  //lock (mongo_client not thread safe)
366  MutexLocker lock(mutex_);
367 
368  //actually execute update
369  try {
370  mongodb_client->update(collection, query, update, upsert);
371  } catch (DBException &e) {
372  log_deb(std::string("Error for update " + update.toString() + " for query " + query.toString()
373  + "\n Exception: " + e.toString()),
374  "error");
375  return 0;
376  }
377  //return success
378  return 1;
379 }
380 
381 /**
382  * Updates documents in the robot memory
383  * @param query The query defining which documents to update
384  * @param update_str What to change in these documents as json string
385  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
386  * @param upsert Should the update document be inserted if the query returns no documents?
387  * @return 1: Success 0: Error
388  */
389 int
391  const std::string &update_str,
392  const std::string &collection,
393  bool upsert)
394 {
395  return update(query, fromjson(update_str), collection, upsert);
396 }
397 
398 /** Atomically update and retrieve document.
399  * @param filter The filter defining the document to update.
400  * If multiple match takes the first one.
401  * @param update Update statement. May only contain update operators!
402  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
403  * @param upsert Should the update document be inserted if the query returns no documents?
404  * @param return_new return the document before (false) or after (true) the update has been applied?
405  * @return document, depending on @p return_new either before or after the udpate has been applied.
406  */
407 mongo::BSONObj
408 RobotMemory::find_one_and_update(const mongo::BSONObj &filter,
409  const mongo::BSONObj &update,
410  const std::string & collection,
411  bool upsert,
412  bool return_new)
413 {
414  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
415 
416  log_deb(std::string("Executing findOneAndUpdate " + update.toString() + " for filter "
417  + filter.toString() + " on collection " + collection));
418 
419  MutexLocker lock(mutex_);
420 
421  try {
422  return mongodb_client->findAndModify(collection, filter, update, upsert, return_new);
423  } catch (DBException &e) {
424  std::string error = "Error for update " + update.toString() + " for query " + filter.toString()
425  + "\n Exception: " + e.toString();
426  log_deb(error, "error");
427  BSONObjBuilder b;
428  b.append("error", error);
429  return b.obj();
430  }
431 }
432 
433 /**
434  * Remove documents from the robot memory
435  * @param query Which documents to remove
436  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
437  * @return 1: Success 0: Error
438  */
439 int
440 RobotMemory::remove(mongo::Query query, const std::string &collection)
441 {
442  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
443  log_deb(std::string("Executing Remove " + query.toString() + " on collection " + collection));
444 
445  //lock (mongo_client not thread safe)
446  MutexLocker lock(mutex_);
447 
448  //actually execute remove
449  try {
450  mongodb_client->remove(collection, query);
451  } catch (DBException &e) {
452  log_deb(std::string("Error for query " + query.toString() + "\n Exception: " + e.toString()),
453  "error");
454  return 0;
455  }
456  //return success
457  return 1;
458 }
459 
460 /**
461  * Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
462  * @param query Which documents to use for the map step
463  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
464  * @param js_map_fun Map function in JavaScript as string
465  * @param js_reduce_fun Reduce function in JavaScript as string
466  * @return BSONObj containing the result
467  */
468 mongo::BSONObj
470  const std::string &collection,
471  const std::string &js_map_fun,
472  const std::string &js_reduce_fun)
473 {
474  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
475  MutexLocker lock(mutex_);
476  log_deb(std::string("Executing MapReduce " + query.toString() + " on collection " + collection
477  + " map: " + js_map_fun + " reduce: " + js_reduce_fun));
478  return mongodb_client->mapreduce(collection, js_map_fun, js_reduce_fun, query);
479 }
480 
481 /**
482  * Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3.2/reference/method/db.collection.aggregate/)
483  * @param pipeline A sequence of data aggregation operations or stages. See the https://docs.mongodb.com/v3.2/reference/operator/aggregation-pipeline/ for details
484  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
485  * @return Cursor to get the documents from, NULL for invalid pipeline
486  */
487 QResCursor
488 RobotMemory::aggregate(mongo::BSONObj pipeline, const std::string &collection)
489 {
490  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
491  MutexLocker lock(mutex_);
492  log_deb(std::string("Executing Aggregation pipeline: " + pipeline.toString() + " on collection "
493  + collection));
494 
495  QResCursor cursor;
496  try {
497  cursor = mongodb_client->aggregate(collection, pipeline);
498  } catch (DBException &e) {
499  std::string error =
500  std::string("Error for query ") + pipeline.toString() + "\n Exception: " + e.toString();
501  log(error, "error");
502  return NULL;
503  }
504  return cursor;
505 }
506 
507 /**
508  * Drop (= remove) a whole collection and all documents inside it
509  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
510  * @return 1: Success 0: Error
511  */
512 int
513 RobotMemory::drop_collection(const std::string &collection)
514 {
515  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
516  MutexLocker lock(mutex_);
517  log_deb("Dropping collection " + collection);
518  return mongodb_client->dropCollection(collection);
519 }
520 
521 /**
522  * Remove the whole database of the robot memory and all documents inside
523  * @return 1: Success 0: Error
524  */
525 int
527 {
528  //lock (mongo_client not thread safe)
529  MutexLocker lock(mutex_);
530 
531  log_deb("Clearing whole robot memory");
532  mongodb_client_local_->dropDatabase(database_name_);
533  return 1;
534 }
535 
536 /**
537  * Restore a previously dumped collection from a directory
538  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
539  * @param directory Directory of the dump
540  * @return 1: Success 0: Error
541  */
542 int
543 RobotMemory::restore_collection(const std::string &collection, const std::string &directory)
544 {
545  std::string coll{std::move(collection)};
546  drop_collection(coll);
547 
548  //lock (mongo_client not thread safe)
549  MutexLocker lock(mutex_);
550 
551  //resolve path to restore
552  if (coll.find(".") == std::string::npos) {
553  log(std::string("Unable to restore collection" + coll), "error");
554  log(std::string("Specify collection like 'db.collection'"), "error");
555  return 0;
556  }
557  std::string path = StringConversions::resolve_path(directory) + "/"
558  + coll.replace(coll.find("."), 1, "/") + ".bson";
559  log_deb(std::string("Restore collection " + collection + " from " + path), "warn");
560 
561  //call mongorestore from folder with initial restores
562  std::string command = "/usr/bin/mongorestore --dir " + path + " --host=127.0.0.1 --quiet";
563  log_deb(std::string("Restore command: " + command), "warn");
564  FILE *bash_output = popen(command.c_str(), "r");
565 
566  //check if output is ok
567  if (!bash_output) {
568  log(std::string("Unable to restore collection" + coll), "error");
569  return 0;
570  }
571  std::string output_string = "";
572  char buffer[100];
573  while (!feof(bash_output)) {
574  if (fgets(buffer, 100, bash_output) == NULL) {
575  break;
576  }
577  output_string += buffer;
578  }
579  pclose(bash_output);
580  if (output_string.find("Failed") != std::string::npos) {
581  log(std::string("Unable to restore collection" + coll), "error");
582  log_deb(output_string, "error");
583  return 0;
584  }
585  return 1;
586 }
587 
588 /**
589  * Dump (= save) a collection to the filesystem to restore it later
590  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
591  * @param directory Directory to dump the collection to
592  * @return 1: Success 0: Error
593  */
594 int
595 RobotMemory::dump_collection(const std::string &collection, const std::string &directory)
596 {
597  //lock (mongo_client not thread safe)
598  MutexLocker lock(mutex_);
599 
600  //resolve path to dump to
601  if (collection.find(".") == std::string::npos) {
602  log(std::string("Unable to dump collection" + collection), "error");
603  log(std::string("Specify collection like 'db.collection'"), "error");
604  return 0;
605  }
606  std::string path = StringConversions::resolve_path(directory);
607  log_deb(std::string("Dump collection " + collection + " into " + path), "warn");
608 
609  //call mongorestore from folder with initial restores
610  std::vector<std::string> split = str_split(collection, '.');
611  std::string command = "/usr/bin/mongodump --out=" + path + " --db=" + split[0]
612  + " --collection=" + split[1] + " --host=127.0.0.1 --quiet";
613  log_deb(std::string("Dump command: " + command), "warn");
614  FILE *bash_output = popen(command.c_str(), "r");
615  //check if output is ok
616  if (!bash_output) {
617  log(std::string("Unable to dump collection" + collection), "error");
618  return 0;
619  }
620  std::string output_string = "";
621  char buffer[100];
622  while (!feof(bash_output)) {
623  if (fgets(buffer, 100, bash_output) == NULL) {
624  break;
625  }
626  output_string += buffer;
627  }
628  pclose(bash_output);
629  if (output_string.find("Failed") != std::string::npos) {
630  log(std::string("Unable to dump collection" + collection), "error");
631  log_deb(output_string, "error");
632  return 0;
633  }
634  return 1;
635 }
636 
637 void
638 RobotMemory::log(const std::string &what, const std::string &info)
639 {
640  if (!info.compare("error"))
641  logger_->log_error(name_, "%s", what.c_str());
642  else if (!info.compare("warn"))
643  logger_->log_warn(name_, "%s", what.c_str());
644  else if (!info.compare("debug"))
645  logger_->log_debug(name_, "%s", what.c_str());
646  else
647  logger_->log_info(name_, "%s", what.c_str());
648 }
649 
650 void
651 RobotMemory::log_deb(const std::string &what, const std::string &level)
652 {
653  if (debug_) {
654  log(what, level);
655  }
656 }
657 
658 void
659 RobotMemory::log_deb(const mongo::Query &query, const std::string &what, const std::string &level)
660 {
661  if (debug_) {
662  log(query, what, level);
663  }
664 }
665 
666 void
667 RobotMemory::log(const mongo::Query &query, const std::string &what, const std::string &level)
668 {
669  std::string output =
670  what + "\nFilter: " + query.getFilter().toString()
671  + "\nModifiers: " + query.getModifiers().toString() + "\nSort: " + query.getSort().toString()
672  + "\nHint: " + query.getHint().toString() + "\nReadPref: " + query.getReadPref().toString();
673  log(output, level);
674 }
675 
676 void
677 RobotMemory::log_deb(const mongo::BSONObj &obj, const std::string &what, const std::string &level)
678 {
679  log(obj, what, level);
680 }
681 
682 void
683 RobotMemory::log(const mongo::BSONObj &obj, const std::string &what, const std::string &level)
684 {
685  std::string output = what + "\nObject: " + obj.toString();
686  log(output, level);
687 }
688 
689 void
690 RobotMemory::set_fields(mongo::BSONObj &obj, const std::string &what)
691 {
692  BSONObjBuilder b;
693  b.appendElements(obj);
694  b.appendElements(fromjson(what));
695  //override
696  obj = b.obj();
697 }
698 
699 void
700 RobotMemory::set_fields(mongo::Query &q, const std::string &what)
701 {
702  BSONObjBuilder b;
703  b.appendElements(q.getFilter());
704  b.appendElements(fromjson(what));
705 
706  //the following is not yet kept in the query:
707  // + "\nFilter: " + query.getFilter().toString()
708  // + "\nModifiers: " + query.getModifiers().toString()
709  // + "\nSort: " + query.getSort().toString()
710  // + "\nHint: " + query.getHint().toString()
711  // + "\nReadPref: " + query.getReadPref().toString();
712 
713  //override
714  q = Query(b.obj());
715 }
716 
717 void
718 RobotMemory::remove_field(mongo::Query &q, const std::string &what)
719 {
720  BSONObjBuilder b;
721  b.appendElements(q.getFilter().removeField(what));
722 
723  //the following is not yet kept in the query:
724  // + "\nFilter: " + query.getFilter().toString()
725  // + "\nModifiers: " + query.getModifiers().toString()
726  // + "\nSort: " + query.getSort().toString()
727  // + "\nHint: " + query.getHint().toString()
728  // + "\nReadPref: " + query.getReadPref().toString();
729 
730  //override
731  q = Query(b.obj());
732 }
733 
734 /**
735  * Get the mongodb client associated with the collection (eighter the local or distributed one)
736  */
737 mongo::DBClientBase *
738 RobotMemory::get_mongodb_client(const std::string &collection)
739 {
740  if (!distributed_) {
741  return mongodb_client_local_;
742  }
743  //get db name of collection
744  size_t point_pos = collection.find(".");
745  if (point_pos == collection.npos) {
746  logger_->log_error(name_, "Collection %s needs to start with 'dbname.'", collection.c_str());
747  return mongodb_client_local_;
748  }
749  std::string db = collection.substr(0, point_pos);
750  if (std::find(distributed_dbs_.begin(), distributed_dbs_.end(), db) != distributed_dbs_.end()) {
751  return mongodb_client_distributed_;
752  }
753  return mongodb_client_local_;
754 }
755 
756 /**
757  * Remove a previously registered trigger
758  * @param trigger Pointer to the trigger to remove
759  */
760 void
762 {
763  trigger_manager_->remove_trigger(trigger);
764 }
765 
766 /**
767  * Remove previously registered computable
768  * @param computable The computable to remove
769  */
770 void
772 {
773  computables_manager_->remove_computable(computable);
774 }
775 
776 /** Explicitly create a mutex.
777  * This is an optional step, a mutex is also created automatically when trying
778  * to acquire the lock for the first time. Adding it explicitly may increase
779  * visibility, e.g., in the database. Use it for mutexes which are locked
780  * only very infrequently.
781  * @param name mutex name
782  * @return true if operation was successful, false on failure
783  */
784 bool
785 RobotMemory::mutex_create(const std::string &name)
786 {
787  mongo::DBClientInterface *client =
788  distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
789  mongo::BSONObjBuilder insert_doc;
790  insert_doc.append("$currentDate", BSON("lock-time" << true));
791  insert_doc.append("_id", name);
792  insert_doc.append("locked", false);
793  try {
794  MutexLocker lock(mutex_);
795  client->insert(cfg_coord_mutex_collection_,
796  insert_doc.obj(),
797  0,
798  &mongo::WriteConcern::majority);
799  return true;
800  } catch (mongo::DBException &e) {
801  logger_->log_info(name_, "Failed to create mutex %s: %s", name.c_str(), e.what());
802  return false;
803  }
804 }
805 
806 /** Destroy a mutex.
807  * The mutex is erased from the database. This is done disregarding it's current
808  * lock state.
809  * @param name mutex name
810  * @return true if operation was successful, false on failure
811  */
812 bool
813 RobotMemory::mutex_destroy(const std::string &name)
814 {
815  mongo::DBClientInterface *client =
816  distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
817  mongo::BSONObj destroy_doc{BSON("_id" << name)};
818  try {
819  MutexLocker lock(mutex_);
820  client->remove(cfg_coord_mutex_collection_, destroy_doc, true, &mongo::WriteConcern::majority);
821  return true;
822  } catch (mongo::DBException &e) {
823  logger_->log_info(name_, "Failed to destroy mutex %s: %s", name.c_str(), e.what());
824  return false;
825  }
826 }
827 
828 /** Try to acquire a lock for a mutex.
829  * This will access the database and atomically find and update (or
830  * insert) a mutex lock. If the mutex has not been created it is added
831  * automatically. If the lock cannot be acquired the function also
832  * returns immediately. There is no blocked waiting for the lock.
833  * @param name mutex name
834  * @param identity string to set as lock-holder
835  * @param force true to force acquisition of the lock, i.e., even if
836  * the lock has already been acquired take ownership (steal the lock).
837  * @return true if operation was successful, false on failure
838  */
839 bool
840 RobotMemory::mutex_try_lock(const std::string &name, const std::string &identity, bool force)
841 {
842  mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
843 
844  std::string locked_by{identity};
845  if (identity.empty()) {
846  HostInfo host_info;
847  locked_by = host_info.name();
848  }
849 
850  // here we can add an $or to implement lock timeouts
851  mongo::BSONObjBuilder filter_doc;
852  filter_doc.append("_id", name);
853  if (!force) {
854  filter_doc.append("locked", false);
855  }
856 
857  mongo::BSONObjBuilder update_doc;
858  update_doc.append("$currentDate", BSON("lock-time" << true));
859  mongo::BSONObjBuilder update_set;
860  update_set.append("locked", true);
861  update_set.append("locked-by", locked_by);
862  update_doc.append("$set", update_set.obj());
863 
864  try {
865  MutexLocker lock(mutex_);
866  BSONObj new_doc = client->findAndModify(cfg_coord_mutex_collection_,
867  filter_doc.obj(),
868  update_doc.obj(),
869  /* upsert */ true,
870  /* return new */ true,
871  /* sort */ BSONObj(),
872  /* fields */ BSONObj(),
873  &mongo::WriteConcern::majority);
874 
875  return (new_doc.getField("locked-by").String() == locked_by
876  && new_doc.getField("locked").Bool());
877 
878  } catch (mongo::OperationException &e) {
879  logger_->log_error(name_, "Mongo OperationException: %s", e.what());
880  try {
881  mongo::BSONObjBuilder check_doc;
882  check_doc.append("_id", name);
883  check_doc.append("locked", true);
884  check_doc.append("locked-by", locked_by);
885  MutexLocker lock(mutex_);
886  BSONObj res_doc = client->findOne(cfg_coord_mutex_collection_, check_doc.obj());
887  logger_->log_info(name_, "Checking whether mutex was acquired succeeded");
888  if (!res_doc.isEmpty()) {
889  logger_->log_warn(name_,
890  "Exception during try-lock for %s, "
891  "but mutex was still acquired",
892  name.c_str());
893  } else {
894  logger_->log_info(name_,
895  "Exception during try-lock for %s, "
896  "and mutex was not acquired",
897  name.c_str());
898  }
899  return !res_doc.isEmpty();
900  } catch (mongo::OperationException &e) {
901  logger_->log_error(name_,
902  "Mongo OperationException while handling "
903  "the first exception: %s",
904  e.what());
905  return false;
906  }
907  }
908 }
909 
910 /** Try to acquire a lock for a mutex.
911  * This will access the database and atomically find and update (or
912  * insert) a mutex lock. If the mutex has not been created it is added
913  * automatically. If the lock cannot be acquired the function also
914  * returns immediately. There is no blocked waiting for the lock.
915  * @param name mutex name
916  * @param force true to force acquisition of the lock, i.e., even if
917  * the lock has already been acquired take ownership (steal the lock).
918  * @return true if operation was successful, false on failure
919  */
920 bool
921 RobotMemory::mutex_try_lock(const std::string &name, bool force)
922 {
923  return mutex_try_lock(name, "", force);
924 }
925 
926 /** Release lock on mutex.
927  * @param name mutex name
928  * @param identity string to set as lock-holder
929  * @return true if operation was successful, false on failure
930  */
931 bool
932 RobotMemory::mutex_unlock(const std::string &name, const std::string &identity)
933 {
934  mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
935 
936  std::string locked_by{identity};
937  if (identity.empty()) {
938  HostInfo host_info;
939  locked_by = host_info.name();
940  }
941 
942  // here we can add an $or to implement lock timeouts
943  mongo::BSONObj filter_doc{BSON("_id" << name << "locked-by" << locked_by)};
944 
945  mongo::BSONObj update_doc{BSON("$set" << BSON("locked" << false) << "$unset"
946  << BSON("locked-by" << true << "lock-time" << true))};
947 
948  try {
949  MutexLocker lock(mutex_);
950  BSONObj new_doc = client->findAndModify(cfg_coord_mutex_collection_,
951  filter_doc,
952  update_doc,
953  /* upsert */ true,
954  /* return new */ true,
955  /* sort */ BSONObj(),
956  /* fields */ BSONObj(),
957  &mongo::WriteConcern::majority);
958 
959  return true;
960  } catch (mongo::OperationException &e) {
961  return false;
962  }
963 }
964 
965 /** Renew a mutex.
966  * Renewing means updating the lock timestamp to the current time to
967  * avoid expiration. Note that the lock must currently be held by
968  * the given identity.
969  * @param name mutex name
970  * @param identity string to set as lock-holder (defaults to hostname
971  * if empty)
972  * @return true if operation was successful, false on failure
973  */
974 bool
975 RobotMemory::mutex_renew_lock(const std::string &name, const std::string &identity)
976 {
977  mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
978 
979  std::string locked_by{identity};
980  if (identity.empty()) {
981  HostInfo host_info;
982  locked_by = host_info.name();
983  }
984 
985  // here we can add an $or to implement lock timeouts
986  mongo::BSONObj filter_doc{BSON("_id" << name << "locked" << true << "locked-by" << locked_by)};
987 
988  // we set all data, even the data which is not actually modified, to
989  // make it easier to process the update in triggers.
990  mongo::BSONObjBuilder update_doc;
991  update_doc.append("$currentDate", BSON("lock-time" << true));
992  mongo::BSONObjBuilder update_set;
993  update_set.append("locked", true);
994  update_set.append("locked-by", locked_by);
995  update_doc.append("$set", update_set.obj());
996 
997  try {
998  MutexLocker lock(mutex_);
999  BSONObj new_doc = client->findAndModify(cfg_coord_mutex_collection_,
1000  filter_doc,
1001  update_doc.obj(),
1002  /* upsert */ false,
1003  /* return new */ true,
1004  /* sort */ BSONObj(),
1005  /* fields */ BSONObj(),
1006  &mongo::WriteConcern::majority);
1007 
1008  return true;
1009  } catch (mongo::OperationException &e) {
1010  logger_->log_warn(name_, "Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1011  return false;
1012  }
1013 }
1014 
1015 /** Setup time-to-live index for mutexes.
1016  * Setting up a time-to-live index for mutexes enables automatic
1017  * expiration through the database. Note, however, that the documents
1018  * are expired only every 60 seconds. This has two consequences:
1019  * - max_age_sec lower than 60 seconds cannot be achieved
1020  * - locks may be held for up to just below 60 seconds longer than
1021  * configured, i.e., if the mutex had not yet expired when the
1022  * background tasks runs.
1023  * @param max_age_sec maximum age of locks in seconds
1024  * @return true if operation was successful, false on failure
1025  */
1026 bool
1028 {
1029  MutexLocker lock(mutex_);
1030 
1031  mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1032 
1033  BSONObj keys = BSON("lock-time" << true);
1034 
1035  try {
1036  client->createIndex(cfg_coord_mutex_collection_,
1037  mongo::IndexSpec().addKeys(keys).expireAfterSeconds(max_age_sec));
1038  } catch (DBException &e) {
1039  logger_->log_warn(name_, "Creating TTL index failed: %s", e.what());
1040  return false;
1041  }
1042  return true;
1043 }
1044 
1045 /** Expire old locks on mutexes.
1046  * This will update the database and set all mutexes to unlocked for
1047  * which the lock-time is older than the given maximum age.
1048  * @param max_age_sec maximum age of locks in seconds
1049  * @return true if operation was successful, false on failure
1050  */
1051 bool
1053 {
1054  mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1055 
1056  using std::chrono::high_resolution_clock;
1057  using std::chrono::milliseconds;
1058  using std::chrono::time_point;
1059  using std::chrono::time_point_cast;
1060 
1061  auto max_age_ms = milliseconds(static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1062  time_point<high_resolution_clock, milliseconds> expire_before =
1063  time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1064  mongo::Date_t expire_before_mdb(expire_before.time_since_epoch().count());
1065 
1066  // here we can add an $or to implement lock timeouts
1067  mongo::BSONObj filter_doc{
1068  BSON("locked" << true << "lock-time" << mongo::LT << expire_before_mdb)};
1069 
1070  mongo::BSONObjBuilder update_doc;
1071  mongo::BSONObjBuilder update_set;
1072  update_set.append("locked", false);
1073  update_set.append("locked-by", "");
1074  update_doc.append("$set", update_set.obj());
1075 
1076  try {
1077  MutexLocker lock(mutex_);
1078  client->update(cfg_coord_mutex_collection_,
1079  filter_doc,
1080  update_doc.obj(),
1081  /* upsert */ false,
1082  /* multi */ true,
1083  &mongo::WriteConcern::majority);
1084 
1085  return true;
1086  } catch (mongo::OperationException &e) {
1087  return false;
1088  }
1089 }
QResCursor query(mongo::Query query, const std::string &collection="")
Query information from the robot memory.
Manager to realize triggers on events in the robot memory.
This class manages registering computables and can check if any computables are invoced by a query.
bool mutex_unlock(const std::string &name, const std::string &identity)
Release lock on mutex.
int create_index(mongo::BSONObj keys, const std::string &collection="", bool unique=false)
Create an index on a collection.
int remove(mongo::Query query, const std::string &collection="")
Remove documents from the robot memory.
Fawkes library namespace.
int update(mongo::Query query, mongo::BSONObj update, const std::string &collection="", bool upsert=false)
Updates documents in the robot memory.
mongo::BSONObj aggregate(const std::vector< mongo::BSONObj > &pipeline, const std::string &collection="")
Aggregation call on the robot memory.
Mutex locking helper.
Definition: mutex_locker.h:33
This is supposed to be the central clock in Fawkes.
Definition: clock.h:34
Class holding information for a single computable this class also enhances computed documents by addi...
Definition: computable.h:29
mongo::BSONObj find_one_and_update(const mongo::BSONObj &filter, const mongo::BSONObj &update, const std::string &collection, bool upsert=false, bool return_new=true)
Atomically update and retrieve document.
void remove_computable(Computable *computable)
Remove previously registered computable.
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
int restore_collection(const std::string &collection, const std::string &directory="@CONFDIR@/robot-memory")
Restore a previously dumped collection from a directory.
Class holding all information about an EventTrigger.
Definition: event_trigger.h:32
mongo::BSONObj mapreduce(mongo::Query query, const std::string &collection, const std::string &js_map_fun, const std::string &js_reduce_fun)
Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
int insert(mongo::BSONObj obj, const std::string &collection="")
Inserts a document into the robot memory.
int drop_collection(const std::string &collection)
Drop (= remove) a whole collection and all documents inside it.
Host information.
Definition: hostinfo.h:31
bool mutex_try_lock(const std::string &name, bool force=false)
Try to acquire a lock for a mutex.
Base class for exceptions in Fawkes.
Definition: exception.h:35
bool mutex_renew_lock(const std::string &name, const std::string &identity)
Renew a mutex.
const char * name()
Get full hostname.
Definition: hostinfo.cpp:100
RobotMemory(fawkes::Configuration *config, fawkes::Logger *logger, fawkes::Clock *clock, fawkes::MongoDBConnCreator *mongo_connection_manager, fawkes::BlackBoard *blackboard)
Robot Memory Constructor with objects of the thread.
bool mutex_setup_ttl(float max_age_sec)
Setup time-to-live index for mutexes.
Interface for a MongoDB connection creator.
bool mutex_create(const std::string &name)
Explicitly create a mutex.
bool mutex_destroy(const std::string &name)
Destroy a mutex.
int clear_memory()
Remove the whole database of the robot memory and all documents inside.
The BlackBoard abstract class.
Definition: blackboard.h:45
bool mutex_expire_locks(float max_age_sec)
Expire old locks on mutexes.
Interface for configuration handling.
Definition: config.h:64
int dump_collection(const std::string &collection, const std::string &directory="@CONFDIR@/robot-memory")
Dump (= save) a collection to the filesystem to restore it later.
Interface for logging.
Definition: logger.h:41