Fawkes API Fawkes Development Version
robot_memory.cpp
1/***************************************************************************
2 * robot_memory.cpp - Class for storing and querying information in the RobotMemory
3 *
4 * Created: Aug 23, 2016 1:34:32 PM 2016
5 * Copyright 2016 Frederik Zwilling
6 * 2017 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "robot_memory.h"
23
24#include <core/threading/mutex.h>
25#include <core/threading/mutex_locker.h>
26#include <interfaces/RobotMemoryInterface.h>
27#include <plugins/mongodb/utils.h>
28#include <utils/misc/string_conversions.h>
29#include <utils/misc/string_split.h>
30#include <utils/system/hostinfo.h>
31#ifdef USE_TIMETRACKER
32# include <utils/time/tracker.h>
33#endif
34#include <utils/time/tracker_macros.h>
35
36#include <bsoncxx/builder/basic/document.hpp>
37#include <chrono>
38#include <mongocxx/client.hpp>
39#include <mongocxx/exception/operation_exception.hpp>
40#include <mongocxx/read_preference.hpp>
41#include <string>
42#include <thread>
43
44using namespace fawkes;
45using namespace mongocxx;
46using namespace bsoncxx;
47
48/** @class RobotMemory "robot_memory.h"
49 * Access to the robot memory based on mongodb.
50 * Using this class, you can query/insert/remove/update information in
51 * the robot memory. Furthermore, you can register trigger to get
52 * notified when something was changed in the robot memory matching
53 * your query and you can access computables, which are on demand
54 * computed information, by registering the computables and then
55 * querying as if the information would already be in the database.
56 * @author Frederik Zwilling
57 */
58
59/**
60 * Robot Memory Constructor with objects of the thread
61 * @param config Fawkes config
62 * @param logger Fawkes logger
63 * @param clock Fawkes clock
64 * @param mongo_connection_manager MongoDBConnCreator to create client connections to the shared and local db
65 * @param blackboard Fawkes blackboard
66 */
68 fawkes::Logger * logger,
69 fawkes::Clock * clock,
70 fawkes::MongoDBConnCreator *mongo_connection_manager,
71 fawkes::BlackBoard * blackboard)
72{
73 config_ = config;
74 logger_ = logger;
75 clock_ = clock;
76 mongo_connection_manager_ = mongo_connection_manager;
77 blackboard_ = blackboard;
78 mongodb_client_local_ = nullptr;
79 mongodb_client_distributed_ = nullptr;
80 debug_ = false;
81}
82
83RobotMemory::~RobotMemory()
84{
85 mongo_connection_manager_->delete_client(mongodb_client_local_);
86 mongo_connection_manager_->delete_client(mongodb_client_distributed_);
87 delete trigger_manager_;
88 blackboard_->close(rm_if_);
89#ifdef USE_TIMETRACKER
90 delete tt_;
91#endif
92}
93
94void
95RobotMemory::init()
96{
97 //load config values
98 log("Started RobotMemory");
99 default_collection_ = "robmem.test";
100 try {
101 default_collection_ = config_->get_string("/plugins/robot-memory/default-collection");
102 } catch (Exception &) {
103 }
104 try {
105 debug_ = config_->get_bool("/plugins/robot-memory/more-debug-output");
106 } catch (Exception &) {
107 }
108 database_name_ = "robmem";
109 try {
110 database_name_ = config_->get_string("/plugins/robot-memory/database");
111 } catch (Exception &) {
112 }
113 distributed_dbs_ = config_->get_strings("/plugins/robot-memory/distributed-db-names");
114
115 cfg_coord_database_ = config_->get_string("/plugins/robot-memory/coordination/database");
116 cfg_coord_mutex_collection_ =
117 config_->get_string("/plugins/robot-memory/coordination/mutex-collection");
118
119 using namespace std::chrono_literals;
120
121 //initiate mongodb connections:
122 log("Connect to local mongod");
123 mongodb_client_local_ = mongo_connection_manager_->create_client("robot-memory-local");
124
125 if (config_->exists("/plugins/mongodb/clients/robot-memory-distributed/enabled")
126 && config_->get_bool("/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
127 distributed_ = true;
128 log("Connect to distributed mongod");
129 mongodb_client_distributed_ =
130 mongo_connection_manager_->create_client("robot-memory-distributed");
131 }
132
133 //init blackboard interface
134 rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
135 config_->get_string("/plugins/robot-memory/interface-name").c_str());
136 rm_if_->set_error("");
137 rm_if_->set_result("");
138 rm_if_->write();
139
140 //Setup event trigger and computables manager
141 trigger_manager_ = new EventTriggerManager(logger_, config_, mongo_connection_manager_);
142 computables_manager_ = new ComputablesManager(config_, this);
143
144 log_deb("Initialized RobotMemory");
145
146#ifdef USE_TIMETRACKER
147 tt_ = new TimeTracker();
148 tt_loopcount_ = 0;
149 ttc_events_ = tt_->add_class("RobotMemory Events");
150 ttc_cleanup_ = tt_->add_class("RobotMemory Cleanup");
151#endif
152}
153
154void
155RobotMemory::loop()
156{
157 TIMETRACK_START(ttc_events_);
158 trigger_manager_->check_events();
159 TIMETRACK_END(ttc_events_);
160 TIMETRACK_START(ttc_cleanup_);
161 computables_manager_->cleanup_computed_docs();
162 TIMETRACK_END(ttc_cleanup_);
163#ifdef USE_TIMETRACKER
164 if (++tt_loopcount_ % 5 == 0) {
165 tt_->print_to_stdout();
166 }
167#endif
168}
169
170/**
171 * Query information from the robot memory.
172 * @param query The query returned documents have to match (essentially a BSONObj)
173 * @param collection_name The database and collection to query as string (e.g. robmem.worldmodel)
174 * @param query_options Optional options to use to query the database
175 * @return Cursor to get the documents from, NULL for invalid query
176 */
177cursor
178RobotMemory::query(document::view query,
179 const std::string & collection_name,
180 mongocxx::options::find query_options)
181{
182 collection collection = get_collection(collection_name);
183 log_deb(std::string("Executing Query " + to_json(query) + " on collection " + collection_name));
184
185 //check if computation on demand is necessary and execute Computables
186 computables_manager_->check_and_compute(query, collection_name);
187
188 //lock (mongo_client not thread safe)
189 MutexLocker lock(mutex_);
190
191 //actually execute query
192 try {
193 return collection.find(query, query_options);
194 } catch (mongocxx::operation_exception &e) {
195 std::string error =
196 std::string("Error for query ") + to_json(query) + "\n Exception: " + e.what();
197 log(error, "error");
198 throw;
199 }
200}
201
202/**
203 * Inserts a document into the robot memory
204 * @param doc A view of the document to insert
205 * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
206 * @return 1: Success 0: Error
207 */
208int
209RobotMemory::insert(bsoncxx::document::view doc, const std::string &collection_name)
210{
211 collection collection = get_collection(collection_name);
212 log_deb(std::string("Inserting " + to_json(doc) + " into collection " + collection_name));
213 //lock (mongo_client not thread safe)
214 MutexLocker lock(mutex_);
215 //actually execute insert
216 try {
217 collection.insert_one(doc);
218 } catch (mongocxx::operation_exception &e) {
219 std::string error = "Error for insert " + to_json(doc) + "\n Exception: " + e.what();
220 log_deb(error, "error");
221 return 0;
222 }
223 //return success
224 return 1;
225}
226
227/** Create an index on a collection.
228 * @param keys The keys document
229 * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
230 * @param unique true to create unique index
231 * @return 1: Success 0: Error
232 */
233int
234RobotMemory::create_index(bsoncxx::document::view keys,
235 const std::string & collection_name,
236 bool unique)
237{
238 collection collection = get_collection(collection_name);
239
240 log_deb(std::string("Creating index " + to_json(keys) + " on collection " + collection_name));
241
242 //lock (mongo_client not thread safe)
243 MutexLocker lock(mutex_);
244
245 //actually execute insert
246 try {
247 using namespace bsoncxx::builder::basic;
248 collection.create_index(keys, make_document(kvp("unique", unique)));
249 } catch (operation_exception &e) {
250 std::string error = "Error when creating index " + to_json(keys) + "\n Exception: " + e.what();
251 log_deb(error, "error");
252 return 0;
253 }
254 //return success
255 return 1;
256}
257
258/**
259 * Inserts all document of a vector into the robot memory
260 * @param docs The vector of BSON documents as views
261 * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
262 * @return 1: Success 0: Error
263 */
264int
265RobotMemory::insert(std::vector<bsoncxx::document::view> docs, const std::string &collection_name)
266{
267 collection collection = get_collection(collection_name);
268 std::string insert_string = "[";
269 for (auto &&doc : docs) {
270 insert_string += to_json(doc) + ",\n";
271 }
272 insert_string += "]";
273
274 log_deb(std::string("Inserting vector of documents " + insert_string + " into collection "
275 + collection_name));
276
277 //lock (mongo_client not thread safe)
278 MutexLocker lock(mutex_);
279
280 //actually execute insert
281 try {
282 collection.insert_many(docs);
283 } catch (operation_exception &e) {
284 std::string error = "Error for insert " + insert_string + "\n Exception: " + e.what();
285 log_deb(error, "error");
286 return 0;
287 }
288 //return success
289 return 1;
290}
291
292/**
293 * Inserts a document into the robot memory
294 * @param obj_str The document as json string
295 * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
296 * @return 1: Success 0: Error
297 */
298int
299RobotMemory::insert(const std::string &obj_str, const std::string &collection)
300{
301 return insert(from_json(obj_str), collection);
302}
303
304/**
305 * Updates documents in the robot memory
306 * @param query The query defining which documents to update
307 * @param update What to change in these documents
308 * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
309 * @param upsert Should the update document be inserted if the query returns no documents?
310 * @return 1: Success 0: Error
311 */
312int
313RobotMemory::update(const bsoncxx::document::view &query,
314 const bsoncxx::document::view &update,
315 const std::string & collection_name,
316 bool upsert)
317{
318 collection collection = get_collection(collection_name);
319 log_deb(std::string("Executing Update " + to_json(update) + " for query " + to_json(query)
320 + " on collection " + collection_name));
321
322 //lock (mongo_client not thread safe)
323 MutexLocker lock(mutex_);
324
325 //actually execute update
326 try {
327 collection.update_many(query,
328 builder::basic::make_document(
329 builder::basic::kvp("$set", builder::concatenate(update))),
330 options::update().upsert(upsert));
331 } catch (operation_exception &e) {
332 log_deb(std::string("Error for update " + to_json(update) + " for query " + to_json(query)
333 + "\n Exception: " + e.what()),
334 "error");
335 return 0;
336 }
337 //return success
338 return 1;
339}
340
341/**
342 * Updates documents in the robot memory
343 * @param query The query defining which documents to update
344 * @param update_str What to change in these documents as json string
345 * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
346 * @param upsert Should the update document be inserted if the query returns no documents?
347 * @return 1: Success 0: Error
348 */
349int
350RobotMemory::update(const bsoncxx::document::view &query,
351 const std::string & update_str,
352 const std::string & collection,
353 bool upsert)
354{
355 return update(query, from_json(update_str), collection, upsert);
356}
357
358/** Atomically update and retrieve document.
359 * @param filter The filter defining the document to update.
360 * If multiple match takes the first one.
361 * @param update Update statement. May only contain update operators!
362 * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
363 * @param upsert Should the update document be inserted if the query returns no documents?
364 * @param return_new return the document before (false) or after (true) the update has been applied?
365 * @return document, depending on @p return_new either before or after the udpate has been applied.
366 */
367document::value
368RobotMemory::find_one_and_update(const document::view &filter,
369 const document::view &update,
370 const std::string & collection_name,
371 bool upsert,
372 bool return_new)
373{
374 collection collection = get_collection(collection_name);
375
376 log_deb(std::string("Executing findOneAndUpdate " + to_json(update) + " for filter "
377 + to_json(filter) + " on collection " + collection_name));
378
379 MutexLocker lock(mutex_);
380
381 try {
382 auto res =
383 collection.find_one_and_update(filter,
384 update,
385 options::find_one_and_update().upsert(upsert).return_document(
386 return_new ? options::return_document::k_after
387 : options::return_document::k_before));
388 if (res) {
389 return *res;
390 } else {
391 std::string error = "Error for update " + to_json(update) + " for query " + to_json(filter)
392 + "FindOneAndUpdate unexpectedly did not return a document";
393 log_deb(error, "warn");
394 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("error", error));
395 }
396 } catch (operation_exception &e) {
397 std::string error = "Error for update " + to_json(update) + " for query " + to_json(filter)
398 + "\n Exception: " + e.what();
399 log_deb(error, "error");
400 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("error", error));
401 }
402}
403
404/**
405 * Remove documents from the robot memory
406 * @param query Which documents to remove
407 * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
408 * @return 1: Success 0: Error
409 */
410int
411RobotMemory::remove(const bsoncxx::document::view &query, const std::string &collection_name)
412{
413 //lock (mongo_client not thread safe)
414 MutexLocker lock(mutex_);
415 collection collection = get_collection(collection_name);
416 log_deb(std::string("Executing Remove " + to_json(query) + " on collection " + collection_name));
417 //actually execute remove
418 try {
419 collection.delete_many(query);
420 } catch (operation_exception &e) {
421 log_deb(std::string("Error for query " + to_json(query) + "\n Exception: " + e.what()),
422 "error");
423 return 0;
424 }
425 //return success
426 return 1;
427}
428
429/**
430 * Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
431 * @param query Which documents to use for the map step
432 * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
433 * @param js_map_fun Map function in JavaScript as string
434 * @param js_reduce_fun Reduce function in JavaScript as string
435 * @return BSONObj containing the result
436 */
437bsoncxx::document::value
438RobotMemory::mapreduce(const bsoncxx::document::view &query,
439 const std::string & collection,
440 const std::string & js_map_fun,
441 const std::string & js_reduce_fun)
442{
443 throw Exception("Not implemented");
444 /*
445 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
446 MutexLocker lock(mutex_);
447 log_deb(std::string("Executing MapReduce " + query.toString() + " on collection " + collection
448 + " map: " + js_map_fun + " reduce: " + js_reduce_fun));
449 return mongodb_client->mapreduce(collection, js_map_fun, js_reduce_fun, query);
450 */
451}
452
453/**
454 * Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3.2/reference/method/db.collection.aggregate/)
455 * @param pipeline A sequence of data aggregation operations or stages. See the https://docs.mongodb.com/v3.2/reference/operator/aggregation-pipeline/ for details
456 * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
457 * @return Cursor to get the documents from, NULL for invalid pipeline
458 */
459cursor
460RobotMemory::aggregate(mongocxx::pipeline &pipeline, const std::string &collection_name)
461{
462 collection collection = get_collection(collection_name);
463 log_deb(std::string("Aggregating in collection " + collection_name));
464 //lock (mongo_client not thread safe)
465 MutexLocker lock(mutex_);
466 //actually execute aggregate
467 try {
468 return collection.aggregate(pipeline, mongocxx::options::aggregate{});
469 } catch (operation_exception &e) {
470 std::string error =
471 std::string("Error when aggregating " + to_json(pipeline.view_array()) + "\n Exception: ")
472 + e.what();
473 log_deb(error, "error");
474 throw;
475 }
476}
477
478/**
479 * Drop (= remove) a whole collection and all documents inside it
480 * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
481 * @return 1: Success 0: Error
482 */
483int
484RobotMemory::drop_collection(const std::string &collection_name)
485{
486 MutexLocker lock(mutex_);
487 collection collection = get_collection(collection_name);
488 log_deb("Dropping collection " + collection_name);
489 collection.drop();
490 return 1;
491}
492
493/**
494 * Remove the whole database of the robot memory and all documents inside
495 * @return 1: Success 0: Error
496 */
497int
499{
500 //lock (mongo_client not thread safe)
501 MutexLocker lock(mutex_);
502
503 log_deb("Clearing whole robot memory");
504 mongodb_client_local_->database(database_name_).drop();
505 return 1;
506}
507
508/**
509 * Restore a previously dumped collection from a directory
510 * @param dbcollection The database and collection to use as string (e.g.
511 * robmem.worldmodel)
512 * @param directory Directory of the dump
513 * @param target_dbcollection Optional different database and collection where
514 * the dump is restored to. If not set, the dump will be restored in the
515 * previous place
516 * @return 1: Success 0: Error
517 */
518int
519RobotMemory::restore_collection(const std::string &dbcollection,
520 const std::string &directory,
521 std::string target_dbcollection)
522{
523 if (target_dbcollection == "") {
524 target_dbcollection = dbcollection;
525 }
526
527 drop_collection(target_dbcollection);
528
529 //lock (mongo_client not thread safe)
530 MutexLocker lock(mutex_);
531
532 auto [db, collection] = split_db_collection_string(dbcollection);
533 std::string path =
534 StringConversions::resolve_path(directory) + "/" + db + "/" + collection + ".bson";
535 log_deb(std::string("Restore collection " + collection + " from " + path), "warn");
536
537 auto [target_db, target_collection] = split_db_collection_string(target_dbcollection);
538
539 //call mongorestore from folder with initial restores
540 std::string command = "/usr/bin/mongorestore --dir " + path + " -d " + target_db + " -c "
541 + target_collection + " --host=" + get_hostport(dbcollection);
542 log_deb(std::string("Restore command: " + command), "warn");
543 FILE *bash_output = popen(command.c_str(), "r");
544
545 //check if output is ok
546 if (!bash_output) {
547 log(std::string("Unable to restore collection" + collection), "error");
548 return 0;
549 }
550 std::string output_string = "";
551 char buffer[100];
552 while (!feof(bash_output)) {
553 if (fgets(buffer, 100, bash_output) == NULL) {
554 break;
555 }
556 output_string += buffer;
557 }
558 pclose(bash_output);
559 if (output_string.find("Failed") != std::string::npos) {
560 log(std::string("Unable to restore collection" + collection), "error");
561 log_deb(output_string, "error");
562 return 0;
563 }
564 return 1;
565}
566
567/**
568 * Dump (= save) a collection to the filesystem to restore it later
569 * @param dbcollection The database and collection to use as string (e.g. robmem.worldmodel)
570 * @param directory Directory to dump the collection to
571 * @return 1: Success 0: Error
572 */
573int
574RobotMemory::dump_collection(const std::string &dbcollection, const std::string &directory)
575{
576 //lock (mongo_client not thread safe)
577 MutexLocker lock(mutex_);
578
579 std::string path = StringConversions::resolve_path(directory);
580 log_deb(std::string("Dump collection " + dbcollection + " into " + path), "warn");
581
582 auto [db, collection] = split_db_collection_string(dbcollection);
583
584 std::string command = "/usr/bin/mongodump --out=" + path + " --db=" + db
585 + " --collection=" + collection + " --forceTableScan"
586 + " --host=" + get_hostport(dbcollection);
587 log(std::string("Dump command: " + command), "info");
588 FILE *bash_output = popen(command.c_str(), "r");
589 //check if output is ok
590 if (!bash_output) {
591 log(std::string("Unable to dump collection" + collection), "error");
592 return 0;
593 }
594 std::string output_string = "";
595 char buffer[100];
596 while (!feof(bash_output)) {
597 if (fgets(buffer, 100, bash_output) == NULL) {
598 break;
599 }
600 output_string += buffer;
601 }
602 pclose(bash_output);
603 if (output_string.find("Failed") != std::string::npos) {
604 log(std::string("Unable to dump collection" + collection), "error");
605 log_deb(output_string, "error");
606 return 0;
607 }
608 return 1;
609}
610
611void
612RobotMemory::log(const std::string &what, const std::string &info)
613{
614 if (!info.compare("error"))
615 logger_->log_error(name_, "%s", what.c_str());
616 else if (!info.compare("warn"))
617 logger_->log_warn(name_, "%s", what.c_str());
618 else if (!info.compare("debug"))
619 logger_->log_debug(name_, "%s", what.c_str());
620 else
621 logger_->log_info(name_, "%s", what.c_str());
622}
623
624void
625RobotMemory::log_deb(const std::string &what, const std::string &level)
626{
627 if (debug_) {
628 log(what, level);
629 }
630}
631
632void
633RobotMemory::log_deb(const bsoncxx::document::view &query,
634 const std::string & what,
635 const std::string & level)
636{
637 if (debug_) {
638 log(query, what, level);
639 }
640}
641
642void
643RobotMemory::log(const bsoncxx::document::view &query,
644 const std::string & what,
645 const std::string & level)
646{
647 log(what + " " + to_json(query), level);
648}
649
650/** Check if the given database is a distributed database
651 * @param dbcollection A database collection name pair of the form <dbname>.<collname>
652 * @return true iff the database is distributed database
653 */
654bool
655RobotMemory::is_distributed_database(const std::string &dbcollection)
656{
657 return std::find(distributed_dbs_.begin(),
658 distributed_dbs_.end(),
659 split_db_collection_string(dbcollection).first)
660 != distributed_dbs_.end();
661}
662
663std::string
664RobotMemory::get_hostport(const std::string &dbcollection)
665{
666 if (distributed_ && is_distributed_database(dbcollection)) {
667 return config_->get_string("/plugins/mongodb/clients/robot-memory-distributed-direct/hostport");
668 } else {
669 return config_->get_string("/plugins/mongodb/clients/robot-memory-local-direct/hostport");
670 }
671}
672
673/**
674 * Get the mongodb client associated with the collection (eighter the local or distributed one)
675 * @param collection The collection name in the form "<dbname>.<collname>"
676 * @return A pointer to the client for the database with name <dbname>
677 */
678client *
679RobotMemory::get_mongodb_client(const std::string &collection)
680{
681 if (!distributed_) {
682 return mongodb_client_local_;
683 }
684 if (is_distributed_database(collection)) {
685 return mongodb_client_distributed_;
686 } else {
687 return mongodb_client_local_;
688 }
689}
690
691/**
692 * Get the collection object referred to by the given string.
693 * @param dbcollection The name of the collection in the form <dbname>.<collname>
694 * @return The respective collection object
695 */
696
697collection
698RobotMemory::get_collection(const std::string &dbcollection)
699{
700 auto db_coll_pair = split_db_collection_string(dbcollection);
701 client *client;
702 if (is_distributed_database(dbcollection)) {
703 client = mongodb_client_distributed_;
704 } else {
705 client = mongodb_client_local_;
706 }
707 return client->database(db_coll_pair.first)[db_coll_pair.second];
708}
709
710/**
711 * Remove a previously registered trigger
712 * @param trigger Pointer to the trigger to remove
713 */
714void
716{
717 trigger_manager_->remove_trigger(trigger);
718}
719
720/**
721 * Remove previously registered computable
722 * @param computable The computable to remove
723 */
724void
726{
727 computables_manager_->remove_computable(computable);
728}
729
730/** Explicitly create a mutex.
731 * This is an optional step, a mutex is also created automatically when trying
732 * to acquire the lock for the first time. Adding it explicitly may increase
733 * visibility, e.g., in the database. Use it for mutexes which are locked
734 * only very infrequently.
735 * @param name mutex name
736 * @return true if operation was successful, false on failure
737 */
738bool
739RobotMemory::mutex_create(const std::string &name)
740{
741 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
742 using namespace bsoncxx::builder;
743 basic::document insert_doc{};
744 insert_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
745 subdoc.append(basic::kvp("lock-time", true));
746 }));
747 insert_doc.append(basic::kvp("_id", name));
748 insert_doc.append(basic::kvp("locked", false));
749 try {
750 MutexLocker lock(mutex_);
751 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
752 auto write_concern = mongocxx::write_concern();
753 write_concern.majority(std::chrono::milliseconds(0));
754 collection.insert_one(insert_doc.view(), options::insert().write_concern(write_concern));
755 return true;
756 } catch (operation_exception &e) {
757 logger_->log_info(name_, "Failed to create mutex %s: %s", name.c_str(), e.what());
758 return false;
759 }
760}
761
762/** Destroy a mutex.
763 * The mutex is erased from the database. This is done disregarding it's current
764 * lock state.
765 * @param name mutex name
766 * @return true if operation was successful, false on failure
767 */
768bool
769RobotMemory::mutex_destroy(const std::string &name)
770{
771 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
772 using namespace bsoncxx::builder;
773 basic::document destroy_doc;
774 destroy_doc.append(basic::kvp("_id", name));
775 try {
776 MutexLocker lock(mutex_);
777 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
778 auto write_concern = mongocxx::write_concern();
779 write_concern.majority(std::chrono::milliseconds(0));
780 collection.delete_one(destroy_doc.view(),
781 options::delete_options().write_concern(write_concern));
782 return true;
783 } catch (operation_exception &e) {
784 logger_->log_info(name_, "Failed to destroy mutex %s: %s", name.c_str(), e.what());
785 return false;
786 }
787}
788
789/** Try to acquire a lock for a mutex.
790 * This will access the database and atomically find and update (or
791 * insert) a mutex lock. If the mutex has not been created it is added
792 * automatically. If the lock cannot be acquired the function also
793 * returns immediately. There is no blocked waiting for the lock.
794 * @param name mutex name
795 * @param identity string to set as lock-holder
796 * @param force true to force acquisition of the lock, i.e., even if
797 * the lock has already been acquired take ownership (steal the lock).
798 * @return true if operation was successful, false on failure
799 */
800bool
801RobotMemory::mutex_try_lock(const std::string &name, const std::string &identity, bool force)
802{
803 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
804
805 std::string locked_by{identity};
806 if (identity.empty()) {
807 HostInfo host_info;
808 locked_by = host_info.name();
809 }
810
811 // here we can add an $or to implement lock timeouts
812 using namespace bsoncxx::builder;
813 basic::document filter_doc;
814 filter_doc.append(basic::kvp("_id", name));
815 if (!force) {
816 filter_doc.append(basic::kvp("locked", false));
817 }
818
819 basic::document update_doc;
820 update_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
821 subdoc.append(basic::kvp("lock-time", true));
822 }));
823 update_doc.append(basic::kvp("$set", [locked_by](basic::sub_document subdoc) {
824 subdoc.append(basic::kvp("locked", true));
825 subdoc.append(basic::kvp("locked-by", locked_by));
826 }));
827 try {
828 MutexLocker lock(mutex_);
829 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
830 auto write_concern = mongocxx::write_concern();
831 write_concern.majority(std::chrono::milliseconds(0));
832 auto new_doc =
833 collection.find_one_and_update(filter_doc.view(),
834 update_doc.view(),
835 options::find_one_and_update()
836 .upsert(true)
837 .return_document(options::return_document::k_after)
838 .write_concern(write_concern));
839
840 if (!new_doc) {
841 return false;
842 }
843 auto new_view = new_doc->view();
844 return (new_view["locked-by"].get_utf8().value.to_string() == locked_by
845 && new_view["locked"].get_bool());
846
847 } catch (operation_exception &e) {
848 logger_->log_error(name_, "Mongo OperationException: %s", e.what());
849 try {
850 // TODO is this extrac check still needed?
851 basic::document check_doc;
852 check_doc.append(basic::kvp("_id", name));
853 check_doc.append(basic::kvp("locked", true));
854 check_doc.append(basic::kvp("locked-by", locked_by));
855 MutexLocker lock(mutex_);
856 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
857 auto res = collection.find_one(check_doc.view());
858 logger_->log_info(name_, "Checking whether mutex was acquired succeeded");
859 if (res) {
860 logger_->log_warn(name_,
861 "Exception during try-lock for %s, "
862 "but mutex was still acquired",
863 name.c_str());
864 } else {
865 logger_->log_info(name_,
866 "Exception during try-lock for %s, "
867 "and mutex was not acquired",
868 name.c_str());
869 }
870 return static_cast<bool>(res);
871 } catch (operation_exception &e) {
872 logger_->log_error(name_,
873 "Mongo OperationException while handling "
874 "the first exception: %s",
875 e.what());
876 return false;
877 }
878 }
879}
880
881/** Try to acquire a lock for a mutex.
882 * This will access the database and atomically find and update (or
883 * insert) a mutex lock. If the mutex has not been created it is added
884 * automatically. If the lock cannot be acquired the function also
885 * returns immediately. There is no blocked waiting for the lock.
886 * @param name mutex name
887 * @param force true to force acquisition of the lock, i.e., even if
888 * the lock has already been acquired take ownership (steal the lock).
889 * @return true if operation was successful, false on failure
890 */
891bool
892RobotMemory::mutex_try_lock(const std::string &name, bool force)
893{
894 return mutex_try_lock(name, "", force);
895}
896
897/** Release lock on mutex.
898 * @param name mutex name
899 * @param identity string to set as lock-holder
900 * @return true if operation was successful, false on failure
901 */
902bool
903RobotMemory::mutex_unlock(const std::string &name, const std::string &identity)
904{
905 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
906
907 std::string locked_by{identity};
908 if (identity.empty()) {
909 HostInfo host_info;
910 locked_by = host_info.name();
911 }
912
913 using namespace bsoncxx::builder;
914 // here we can add an $or to implement lock timeouts
915 basic::document filter_doc;
916 filter_doc.append(basic::kvp("_id", name));
917 filter_doc.append(basic::kvp("locked-by", locked_by));
918
919 basic::document update_doc;
920 update_doc.append(basic::kvp("$set", [](basic::sub_document subdoc) {
921 subdoc.append(basic::kvp("locked", false));
922 }));
923 update_doc.append(basic::kvp("$unset", [](basic::sub_document subdoc) {
924 subdoc.append(basic::kvp("locked-by", true));
925 subdoc.append(basic::kvp("lock-time", true));
926 }));
927
928 try {
929 MutexLocker lock(mutex_);
930 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
931 auto write_concern = mongocxx::write_concern();
932 write_concern.majority(std::chrono::milliseconds(0));
933 auto new_doc =
934 collection.find_one_and_update(filter_doc.view(),
935 update_doc.view(),
936 options::find_one_and_update()
937 .upsert(true)
938 .return_document(options::return_document::k_after)
939 .write_concern(write_concern));
940 if (!new_doc) {
941 return false;
942 }
943 return new_doc->view()["locked"].get_bool();
944 } catch (operation_exception &e) {
945 return false;
946 }
947}
948
949/** Renew a mutex.
950 * Renewing means updating the lock timestamp to the current time to
951 * avoid expiration. Note that the lock must currently be held by
952 * the given identity.
953 * @param name mutex name
954 * @param identity string to set as lock-holder (defaults to hostname
955 * if empty)
956 * @return true if operation was successful, false on failure
957 */
958bool
959RobotMemory::mutex_renew_lock(const std::string &name, const std::string &identity)
960{
961 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
962
963 std::string locked_by{identity};
964 if (identity.empty()) {
965 HostInfo host_info;
966 locked_by = host_info.name();
967 }
968
969 using namespace bsoncxx::builder;
970 // here we can add an $or to implement lock timeouts
971 basic::document filter_doc;
972 filter_doc.append(basic::kvp("_id", name));
973 filter_doc.append(basic::kvp("locked", true));
974 filter_doc.append(basic::kvp("locked-by", locked_by));
975
976 // we set all data, even the data which is not actually modified, to
977 // make it easier to process the update in triggers.
978 basic::document update_doc;
979 update_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
980 subdoc.append(basic::kvp("lock-time", true));
981 }));
982 update_doc.append(basic::kvp("$set", [locked_by](basic::sub_document subdoc) {
983 subdoc.append(basic::kvp("locked", true));
984 subdoc.append(basic::kvp("locked-by", locked_by));
985 }));
986
987 try {
988 MutexLocker lock(mutex_);
989 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
990 auto write_concern = mongocxx::write_concern();
991 write_concern.majority(std::chrono::milliseconds(0));
992 auto new_doc =
993 collection.find_one_and_update(filter_doc.view(),
994 update_doc.view(),
995 options::find_one_and_update()
996 .upsert(false)
997 .return_document(options::return_document::k_after)
998 .write_concern(write_concern));
999 return static_cast<bool>(new_doc);
1000 } catch (operation_exception &e) {
1001 logger_->log_warn(name_, "Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1002 return false;
1003 }
1004}
1005
1006/** Setup time-to-live index for mutexes.
1007 * Setting up a time-to-live index for mutexes enables automatic
1008 * expiration through the database. Note, however, that the documents
1009 * are expired only every 60 seconds. This has two consequences:
1010 * - max_age_sec lower than 60 seconds cannot be achieved
1011 * - locks may be held for up to just below 60 seconds longer than
1012 * configured, i.e., if the mutex had not yet expired when the
1013 * background tasks runs.
1014 * @param max_age_sec maximum age of locks in seconds
1015 * @return true if operation was successful, false on failure
1016 */
1017bool
1019{
1020 MutexLocker lock(mutex_);
1021
1022 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1023
1024 auto keys = builder::basic::make_document(builder::basic::kvp("lock-time", true));
1025
1026 try {
1027 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1028 collection.create_index(keys.view(),
1029 builder::basic::make_document(
1030 builder::basic::kvp("expireAfterSeconds", max_age_sec)));
1031 } catch (operation_exception &e) {
1032 logger_->log_warn(name_, "Creating TTL index failed: %s", e.what());
1033 return false;
1034 }
1035 return true;
1036}
1037
1038/** Expire old locks on mutexes.
1039 * This will update the database and set all mutexes to unlocked for
1040 * which the lock-time is older than the given maximum age.
1041 * @param max_age_sec maximum age of locks in seconds
1042 * @return true if operation was successful, false on failure
1043 */
1044bool
1046{
1047 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1048
1049 using std::chrono::high_resolution_clock;
1050 using std::chrono::milliseconds;
1051 using std::chrono::time_point;
1052 using std::chrono::time_point_cast;
1053
1054 auto max_age_ms = milliseconds(static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1055 time_point<high_resolution_clock, milliseconds> expire_before =
1056 time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1057 types::b_date expire_before_mdb(expire_before);
1058
1059 // here we can add an $or to implement lock timeouts
1060 using namespace bsoncxx::builder;
1061 basic::document filter_doc;
1062 filter_doc.append(basic::kvp("locked", true));
1063 filter_doc.append(basic::kvp("lock-time", [expire_before_mdb](basic::sub_document subdoc) {
1064 subdoc.append(basic::kvp("$lt", expire_before_mdb));
1065 }));
1066
1067 basic::document update_doc;
1068 update_doc.append(basic::kvp("$set", [](basic::sub_document subdoc) {
1069 subdoc.append(basic::kvp("locked", false));
1070 }));
1071 update_doc.append(basic::kvp("$unset", [](basic::sub_document subdoc) {
1072 subdoc.append(basic::kvp("locked-by", true));
1073 subdoc.append(basic::kvp("lock-time", true));
1074 }));
1075
1076 try {
1077 MutexLocker lock(mutex_);
1078 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1079 auto write_concern = mongocxx::write_concern();
1080 write_concern.majority(std::chrono::milliseconds(0));
1081 collection.update_many(filter_doc.view(),
1082 update_doc.view(),
1083 options::update().write_concern(write_concern));
1084 return true;
1085 } catch (operation_exception &e) {
1086 log(std::string("Failed to expire locks: " + std::string(e.what())), "error");
1087 return false;
1088 }
1089}
Class holding information for a single computable this class also enhances computed documents by addi...
Definition: computable.h:32
This class manages registering computables and can check if any computables are invoced by a query.
void cleanup_computed_docs()
Clean up all collections containing documents computed on demand.
void remove_computable(Computable *computable)
Remove previously registered computable.
bool check_and_compute(const bsoncxx::document::view &query, std::string collection)
Checks if computable knowledge is queried and calls the compute functions in this case.
Manager to realize triggers on events in the robot memory.
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
Class holding all information about an EventTrigger.
Definition: event_trigger.h:32
bool mutex_create(const std::string &name)
Explicitly create a mutex.
int dump_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory")
Dump (= save) a collection to the filesystem to restore it later.
bool mutex_destroy(const std::string &name)
Destroy a mutex.
bool mutex_renew_lock(const std::string &name, const std::string &identity)
Renew a mutex.
bool mutex_unlock(const std::string &name, const std::string &identity)
Release lock on mutex.
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
mongocxx::cursor query(bsoncxx::document::view query, const std::string &collection_name="", mongocxx::options::find query_options=mongocxx::options::find())
Query information from the robot memory.
bsoncxx::document::value find_one_and_update(const bsoncxx::document::view &filter, const bsoncxx::document::view &update, const std::string &collection, bool upsert=false, bool return_new=true)
Atomically update and retrieve document.
bool mutex_setup_ttl(float max_age_sec)
Setup time-to-live index for mutexes.
bsoncxx::document::value mapreduce(const bsoncxx::document::view &query, const std::string &collection, const std::string &js_map_fun, const std::string &js_reduce_fun)
Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
int remove(const bsoncxx::document::view &query, const std::string &collection="")
Remove documents from the robot memory.
int insert(bsoncxx::document::view, const std::string &collection="")
Inserts a document into the robot memory.
mongocxx::cursor aggregate(mongocxx::pipeline &pipeline, const std::string &collection="")
Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3....
void remove_computable(Computable *computable)
Remove previously registered computable.
int drop_collection(const std::string &collection)
Drop (= remove) a whole collection and all documents inside it.
int update(const bsoncxx::document::view &query, const bsoncxx::document::view &update, const std::string &collection="", bool upsert=false)
Updates documents in the robot memory.
bool mutex_try_lock(const std::string &name, bool force=false)
Try to acquire a lock for a mutex.
int create_index(bsoncxx::document::view keys, const std::string &collection="", bool unique=false)
Create an index on a collection.
int clear_memory()
Remove the whole database of the robot memory and all documents inside.
RobotMemory(fawkes::Configuration *config, fawkes::Logger *logger, fawkes::Clock *clock, fawkes::MongoDBConnCreator *mongo_connection_manager, fawkes::BlackBoard *blackboard)
Robot Memory Constructor with objects of the thread.
int restore_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory", std::string target_dbcollection="")
Restore a previously dumped collection from a directory.
bool mutex_expire_locks(float max_age_sec)
Expire old locks on mutexes.
The BlackBoard abstract class.
Definition: blackboard.h:46
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
This is supposed to be the central clock in Fawkes.
Definition: clock.h:35
Interface for configuration handling.
Definition: config.h:68
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual bool exists(const char *path)=0
Check if a given value exists.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
Host information.
Definition: hostinfo.h:32
const char * name()
Get full hostname.
Definition: hostinfo.cpp:100
Interface for logging.
Definition: logger.h:42
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Interface for a MongoDB connection creator.
virtual mongocxx::client * create_client(const std::string &config_name="")=0
Create a new MongoDB client.
virtual void delete_client(mongocxx::client *client)=0
Delete a client.
Mutex locking helper.
Definition: mutex_locker.h:34
Time tracking utility.
Definition: tracker.h:37
Fawkes library namespace.