22 #include "robot_memory.h"
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/RobotMemoryInterface.h>
27 #include <plugins/mongodb/utils.h>
28 #include <utils/misc/string_conversions.h>
29 #include <utils/misc/string_split.h>
30 #include <utils/system/hostinfo.h>
31 #ifdef USE_TIMETRACKER
32 # include <utils/time/tracker.h>
34 #include <utils/time/tracker_macros.h>
36 #include <bsoncxx/builder/basic/document.hpp>
38 #include <mongocxx/client.hpp>
39 #include <mongocxx/exception/operation_exception.hpp>
40 #include <mongocxx/read_preference.hpp>
45 using namespace mongocxx;
46 using namespace bsoncxx;
76 mongo_connection_manager_ = mongo_connection_manager;
77 blackboard_ = blackboard;
78 mongodb_client_local_ =
nullptr;
79 mongodb_client_distributed_ =
nullptr;
83 RobotMemory::~RobotMemory()
85 mongo_connection_manager_->delete_client(mongodb_client_local_);
86 mongo_connection_manager_->delete_client(mongodb_client_distributed_);
87 delete trigger_manager_;
88 blackboard_->close(rm_if_);
89 #ifdef USE_TIMETRACKER
98 log(
"Started RobotMemory");
99 default_collection_ =
"robmem.test";
101 default_collection_ = config_->get_string(
"/plugins/robot-memory/default-collection");
105 debug_ = config_->get_bool(
"/plugins/robot-memory/more-debug-output");
108 database_name_ =
"robmem";
110 database_name_ = config_->get_string(
"/plugins/robot-memory/database");
113 distributed_dbs_ = config_->get_strings(
"/plugins/robot-memory/distributed-db-names");
114 cfg_startup_grace_period_ = 10;
116 cfg_startup_grace_period_ = config_->get_uint(
"/plugins/robot-memory/startup-grace-period");
120 cfg_coord_database_ = config_->get_string(
"/plugins/robot-memory/coordination/database");
121 cfg_coord_mutex_collection_ =
122 config_->get_string(
"/plugins/robot-memory/coordination/mutex-collection");
124 using namespace std::chrono_literals;
127 log(
"Connect to local mongod");
128 unsigned int startup_tries = 0;
129 for (; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
132 mongodb_client_local_ = mongo_connection_manager_->create_client(
"robot-memory-local");
135 logger_->log_info(name_,
"Waiting for local");
136 std::this_thread::sleep_for(500ms);
140 if (config_->exists(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")
141 && config_->get_bool(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
143 log(
"Connect to distributed mongod");
144 for (startup_tries = 0; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
147 mongodb_client_distributed_ =
148 mongo_connection_manager_->create_client(
"robot-memory-distributed");
151 logger_->log_info(name_,
"Waiting for distributed");
152 std::this_thread::sleep_for(500ms);
158 rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
159 config_->get_string(
"/plugins/robot-memory/interface-name").c_str());
160 rm_if_->set_error(
"");
161 rm_if_->set_result(
"");
168 log_deb(
"Initialized RobotMemory");
170 #ifdef USE_TIMETRACKER
173 ttc_events_ = tt_->add_class(
"RobotMemory Events");
174 ttc_cleanup_ = tt_->add_class(
"RobotMemory Cleanup");
181 TIMETRACK_START(ttc_events_);
182 trigger_manager_->check_events();
183 TIMETRACK_END(ttc_events_);
184 TIMETRACK_START(ttc_cleanup_);
185 computables_manager_->cleanup_computed_docs();
186 TIMETRACK_END(ttc_cleanup_);
187 #ifdef USE_TIMETRACKER
188 if (++tt_loopcount_ % 5 == 0) {
189 tt_->print_to_stdout();
203 const std::string & collection_name,
204 mongocxx::options::find query_options)
206 collection collection = get_collection(collection_name);
207 log_deb(std::string(
"Executing Query " + to_json(
query) +
" on collection " + collection_name));
210 computables_manager_->check_and_compute(
query, collection_name);
217 return collection.find(
query, query_options);
218 }
catch (mongocxx::operation_exception &e) {
220 std::string(
"Error for query ") + to_json(
query) +
"\n Exception: " + e.what();
232 bsoncxx::document::value
234 const std::string & collection)
278 collection collection = get_collection(collection_name);
279 log_deb(std::string(
"Inserting " + to_json(doc) +
" into collection " + collection_name));
284 collection.insert_one(doc);
285 }
catch (mongocxx::operation_exception &e) {
286 std::string error =
"Error for insert " + to_json(doc) +
"\n Exception: " + e.what();
287 log_deb(error,
"error");
302 const std::string & collection_name,
305 collection collection = get_collection(collection_name);
307 log_deb(std::string(
"Creating index " + to_json(keys) +
" on collection " + collection_name));
314 using namespace bsoncxx::builder::basic;
315 collection.create_index(keys, make_document(kvp(
"unique", unique)));
316 }
catch (operation_exception &e) {
317 std::string error =
"Error when creating index " + to_json(keys) +
"\n Exception: " + e.what();
318 log_deb(error,
"error");
334 collection collection = get_collection(collection_name);
335 std::string insert_string =
"[";
336 for (
auto &&doc : docs) {
337 insert_string += to_json(doc) +
",\n";
339 insert_string +=
"]";
341 log_deb(std::string(
"Inserting vector of documents " + insert_string +
" into collection "
349 collection.insert_many(docs);
350 }
catch (operation_exception &e) {
351 std::string error =
"Error for insert " + insert_string +
"\n Exception: " + e.what();
352 log_deb(error,
"error");
368 return insert(from_json(obj_str), collection);
381 const bsoncxx::document::view &
update,
382 const std::string & collection_name,
385 collection collection = get_collection(collection_name);
386 log_deb(std::string(
"Executing Update " + to_json(
update) +
" for query " + to_json(
query)
387 +
" on collection " + collection_name));
394 collection.update_many(
query,
395 builder::basic::make_document(
396 builder::basic::kvp(
"$set", builder::concatenate(
update))),
397 options::update().upsert(upsert));
398 }
catch (operation_exception &e) {
399 log_deb(std::string(
"Error for update " + to_json(
update) +
" for query " + to_json(
query)
400 +
"\n Exception: " + e.what()),
418 const std::string & update_str,
419 const std::string & collection,
422 return update(
query, from_json(update_str), collection, upsert);
436 const document::view &
update,
437 const std::string & collection_name,
441 collection collection = get_collection(collection_name);
443 log_deb(std::string(
"Executing findOneAndUpdate " + to_json(
update) +
" for filter "
444 + to_json(filter) +
" on collection " + collection_name));
450 collection.find_one_and_update(filter,
452 options::find_one_and_update().upsert(upsert).return_document(
453 return_new ? options::return_document::k_after
454 : options::return_document::k_before));
458 std::string error =
"Error for update " + to_json(
update) +
" for query " + to_json(filter)
459 +
"FindOneAndUpdate unexpectedly did not return a document";
460 log_deb(error,
"warn");
461 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp(
"error", error));
463 }
catch (operation_exception &e) {
464 std::string error =
"Error for update " + to_json(
update) +
" for query " + to_json(filter)
465 +
"\n Exception: " + e.what();
466 log_deb(error,
"error");
467 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp(
"error", error));
482 collection collection = get_collection(collection_name);
483 log_deb(std::string(
"Executing Remove " + to_json(
query) +
" on collection " + collection_name));
486 collection.delete_many(
query);
487 }
catch (operation_exception &e) {
488 log_deb(std::string(
"Error for query " + to_json(
query) +
"\n Exception: " + e.what()),
504 bsoncxx::document::value
506 const std::string & collection,
507 const std::string & js_map_fun,
508 const std::string & js_reduce_fun)
558 collection collection = get_collection(collection_name);
559 log_deb(
"Dropping collection " + collection_name);
574 log_deb(
"Clearing whole robot memory");
575 mongodb_client_local_->database(database_name_).drop();
588 std::string coll{std::move(collection)};
595 if (coll.find(
".") == std::string::npos) {
596 log(std::string(
"Unable to restore collection" + coll),
"error");
597 log(std::string(
"Specify collection like 'db.collection'"),
"error");
601 + coll.replace(coll.find(
"."), 1,
"/") +
".bson";
602 log_deb(std::string(
"Restore collection " + collection +
" from " + path),
"warn");
605 std::string command =
"/usr/bin/mongorestore --dir " + path +
" --host=127.0.0.1 --quiet";
606 log_deb(std::string(
"Restore command: " + command),
"warn");
607 FILE *bash_output = popen(command.c_str(),
"r");
611 log(std::string(
"Unable to restore collection" + coll),
"error");
614 std::string output_string =
"";
616 while (!feof(bash_output)) {
617 if (fgets(buffer, 100, bash_output) == NULL) {
620 output_string += buffer;
623 if (output_string.find(
"Failed") != std::string::npos) {
624 log(std::string(
"Unable to restore collection" + coll),
"error");
625 log_deb(output_string,
"error");
644 if (collection.find(
".") == std::string::npos) {
645 log(std::string(
"Unable to dump collection" + collection),
"error");
646 log(std::string(
"Specify collection like 'db.collection'"),
"error");
650 log_deb(std::string(
"Dump collection " + collection +
" into " + path),
"warn");
653 std::vector<std::string> split =
str_split(collection,
'.');
654 std::string command =
"/usr/bin/mongodump --out=" + path +
" --db=" + split[0]
655 +
" --collection=" + split[1] +
" --host=127.0.0.1 --quiet";
656 log_deb(std::string(
"Dump command: " + command),
"warn");
657 FILE *bash_output = popen(command.c_str(),
"r");
660 log(std::string(
"Unable to dump collection" + collection),
"error");
663 std::string output_string =
"";
665 while (!feof(bash_output)) {
666 if (fgets(buffer, 100, bash_output) == NULL) {
669 output_string += buffer;
672 if (output_string.find(
"Failed") != std::string::npos) {
673 log(std::string(
"Unable to dump collection" + collection),
"error");
674 log_deb(output_string,
"error");
681 RobotMemory::log(
const std::string &what,
const std::string &info)
683 if (!info.compare(
"error"))
684 logger_->log_error(name_,
"%s", what.c_str());
685 else if (!info.compare(
"warn"))
686 logger_->log_warn(name_,
"%s", what.c_str());
687 else if (!info.compare(
"debug"))
688 logger_->log_debug(name_,
"%s", what.c_str());
690 logger_->log_info(name_,
"%s", what.c_str());
694 RobotMemory::log_deb(
const std::string &what,
const std::string &level)
702 RobotMemory::log_deb(
const bsoncxx::document::view &
query,
703 const std::string & what,
704 const std::string & level)
707 log(
query, what, level);
712 RobotMemory::log(
const bsoncxx::document::view &
query,
713 const std::string & what,
714 const std::string & level)
716 log(what +
" " + to_json(
query), level);
724 RobotMemory::is_distributed_database(
const std::string &dbcollection)
726 return std::find(distributed_dbs_.begin(),
727 distributed_dbs_.end(),
728 split_db_collection_string(dbcollection).first)
729 != distributed_dbs_.end();
738 RobotMemory::get_mongodb_client(
const std::string &collection)
741 return mongodb_client_local_;
743 if (is_distributed_database(collection)) {
744 return mongodb_client_distributed_;
746 return mongodb_client_local_;
757 RobotMemory::get_collection(
const std::string &dbcollection)
759 auto db_coll_pair = split_db_collection_string(dbcollection);
761 if (is_distributed_database(dbcollection)) {
762 client = mongodb_client_distributed_;
764 client = mongodb_client_local_;
766 return client->database(db_coll_pair.first)[db_coll_pair.second];
776 trigger_manager_->remove_trigger(trigger);
786 computables_manager_->remove_computable(computable);
800 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
801 using namespace bsoncxx::builder;
802 basic::document insert_doc{};
803 insert_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
804 subdoc.append(basic::kvp(
"lock-time",
true));
806 insert_doc.append(basic::kvp(
"_id", name));
807 insert_doc.append(basic::kvp(
"locked",
false));
810 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
811 auto write_concern = mongocxx::write_concern();
812 write_concern.majority(std::chrono::milliseconds(0));
813 collection.insert_one(insert_doc.view(), options::insert().write_concern(write_concern));
815 }
catch (operation_exception &e) {
816 logger_->log_info(name_,
"Failed to create mutex %s: %s", name.c_str(), e.what());
830 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
831 using namespace bsoncxx::builder;
832 basic::document destroy_doc;
833 destroy_doc.append(basic::kvp(
"_id", name));
836 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
837 auto write_concern = mongocxx::write_concern();
838 write_concern.majority(std::chrono::milliseconds(0));
839 collection.delete_one(destroy_doc.view(),
840 options::delete_options().write_concern(write_concern));
842 }
catch (operation_exception &e) {
843 logger_->log_info(name_,
"Failed to destroy mutex %s: %s", name.c_str(), e.what());
862 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
864 std::string locked_by{identity};
865 if (identity.empty()) {
867 locked_by = host_info.
name();
871 using namespace bsoncxx::builder;
872 basic::document filter_doc;
873 filter_doc.append(basic::kvp(
"_id", name));
875 filter_doc.append(basic::kvp(
"locked",
false));
878 basic::document update_doc;
879 update_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
880 subdoc.append(basic::kvp(
"lock-time",
true));
882 update_doc.append(basic::kvp(
"$set", [locked_by](basic::sub_document subdoc) {
883 subdoc.append(basic::kvp(
"locked",
true));
884 subdoc.append(basic::kvp(
"locked-by", locked_by));
888 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
889 auto write_concern = mongocxx::write_concern();
890 write_concern.majority(std::chrono::milliseconds(0));
892 collection.find_one_and_update(filter_doc.view(),
894 options::find_one_and_update()
896 .return_document(options::return_document::k_after)
897 .write_concern(write_concern));
902 auto new_view = new_doc->view();
903 return (new_view[
"locked-by"].get_utf8().value.to_string() == locked_by
904 && new_view[
"locked"].get_bool());
906 }
catch (operation_exception &e) {
907 logger_->log_error(name_,
"Mongo OperationException: %s", e.what());
910 basic::document check_doc;
911 check_doc.append(basic::kvp(
"_id", name));
912 check_doc.append(basic::kvp(
"locked",
true));
913 check_doc.append(basic::kvp(
"locked-by", locked_by));
915 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
916 auto res = collection.find_one(check_doc.view());
917 logger_->log_info(name_,
"Checking whether mutex was acquired succeeded");
919 logger_->log_warn(name_,
920 "Exception during try-lock for %s, "
921 "but mutex was still acquired",
924 logger_->log_info(name_,
925 "Exception during try-lock for %s, "
926 "and mutex was not acquired",
929 return static_cast<bool>(res);
930 }
catch (operation_exception &e) {
931 logger_->log_error(name_,
932 "Mongo OperationException while handling "
933 "the first exception: %s",
964 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
966 std::string locked_by{identity};
967 if (identity.empty()) {
969 locked_by = host_info.
name();
972 using namespace bsoncxx::builder;
974 basic::document filter_doc;
975 filter_doc.append(basic::kvp(
"_id", name));
976 filter_doc.append(basic::kvp(
"locked-by", locked_by));
978 basic::document update_doc;
979 update_doc.append(basic::kvp(
"$set", [](basic::sub_document subdoc) {
980 subdoc.append(basic::kvp(
"locked",
false));
982 update_doc.append(basic::kvp(
"$unset", [](basic::sub_document subdoc) {
983 subdoc.append(basic::kvp(
"locked-by",
true));
984 subdoc.append(basic::kvp(
"lock-time",
true));
989 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
990 auto write_concern = mongocxx::write_concern();
991 write_concern.majority(std::chrono::milliseconds(0));
993 collection.find_one_and_update(filter_doc.view(),
995 options::find_one_and_update()
997 .return_document(options::return_document::k_after)
998 .write_concern(write_concern));
1002 return new_doc->view()[
"locked"].get_bool();
1003 }
catch (operation_exception &e) {
1020 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1022 std::string locked_by{identity};
1023 if (identity.empty()) {
1025 locked_by = host_info.
name();
1028 using namespace bsoncxx::builder;
1030 basic::document filter_doc;
1031 filter_doc.append(basic::kvp(
"_id", name));
1032 filter_doc.append(basic::kvp(
"locked",
true));
1033 filter_doc.append(basic::kvp(
"locked-by", locked_by));
1037 basic::document update_doc;
1038 update_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
1039 subdoc.append(basic::kvp(
"lock-time",
true));
1041 update_doc.append(basic::kvp(
"$set", [locked_by](basic::sub_document subdoc) {
1042 subdoc.append(basic::kvp(
"locked",
true));
1043 subdoc.append(basic::kvp(
"locked-by", locked_by));
1048 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1049 auto write_concern = mongocxx::write_concern();
1050 write_concern.majority(std::chrono::milliseconds(0));
1052 collection.find_one_and_update(filter_doc.view(),
1054 options::find_one_and_update()
1056 .return_document(options::return_document::k_after)
1057 .write_concern(write_concern));
1058 return static_cast<bool>(new_doc);
1059 }
catch (operation_exception &e) {
1060 logger_->log_warn(name_,
"Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1081 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1083 auto keys = builder::basic::make_document(builder::basic::kvp(
"lock-time",
true));
1086 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1087 collection.create_index(keys.view(),
1088 builder::basic::make_document(
1089 builder::basic::kvp(
"expireAfterSeconds", max_age_sec)));
1090 }
catch (operation_exception &e) {
1091 logger_->log_warn(name_,
"Creating TTL index failed: %s", e.what());
1106 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1108 using std::chrono::high_resolution_clock;
1109 using std::chrono::milliseconds;
1110 using std::chrono::time_point;
1111 using std::chrono::time_point_cast;
1113 auto max_age_ms = milliseconds(
static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1114 time_point<high_resolution_clock, milliseconds> expire_before =
1115 time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1116 types::b_date expire_before_mdb(expire_before);
1119 using namespace bsoncxx::builder;
1120 basic::document filter_doc;
1121 filter_doc.append(basic::kvp(
"locked",
true));
1122 filter_doc.append(basic::kvp(
"lock-time", [expire_before_mdb](basic::sub_document subdoc) {
1123 subdoc.append(basic::kvp(
"$lt", expire_before_mdb));
1126 basic::document update_doc;
1127 update_doc.append(basic::kvp(
"$set", [](basic::sub_document subdoc) {
1128 subdoc.append(basic::kvp(
"locked",
false));
1130 update_doc.append(basic::kvp(
"$unset", [](basic::sub_document subdoc) {
1131 subdoc.append(basic::kvp(
"locked-by",
true));
1132 subdoc.append(basic::kvp(
"lock-time",
true));
1137 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1138 auto write_concern = mongocxx::write_concern();
1139 write_concern.majority(std::chrono::milliseconds(0));
1140 collection.update_many(filter_doc.view(),
1142 options::update().write_concern(write_concern));
1144 }
catch (operation_exception &e) {
1145 log(std::string(
"Failed to expire locks: " + std::string(e.what())),
"error");