22#include "pcl_db_store_thread.h"
24#include <blackboard/utils/on_message_waker.h>
25#include <interfaces/PclDatabaseStoreInterface.h>
26#include <pcl_utils/pcl_adapter.h>
29#include <bsoncxx/builder/basic/document.hpp>
30#include <mongocxx/client.hpp>
31#include <mongocxx/exception/operation_exception.hpp>
32#include <mongocxx/gridfs/bucket.hpp>
33#include <mongocxx/gridfs/uploader.hpp>
35#define CFG_PREFIX "/perception/pcl-db/"
38using namespace mongocxx;
88 if (store_if_->msgq_empty())
91 if (PclDatabaseStoreInterface::StoreMessage *msg = store_if_->msgq_first_safe(msg)) {
92 store_if_->set_final(
false);
93 store_if_->set_msgid(msg->id());
94 store_if_->set_error(
"");
97 std::string msg_database = msg->database();
98 std::string msg_collection = msg->collection();
102 std::string database = (msg_database !=
"") ? msg_database : cfg_database_;
103 std::string collection = database +
".";
104 if (msg_collection ==
"") {
105 collection +=
"pcls";
106 }
else if (msg_collection.compare(0, 3,
"fs.") == 0) {
107 errmsg =
"Passed in collection uses GridFS namespace";
110 collection += msg->collection();
114 store_pointcloud(msg->pcl_id(), database, collection, errmsg);
116 store_if_->set_error(errmsg.c_str());
117 store_if_->set_final(
true);
122 store_if_->msgq_pop();
126PointCloudDBStoreThread::store_pointcloud(std::string pcl_id,
127 std::string database,
128 std::string collection,
132 errmsg =
"PointCloud does not exist";
136 std::string frame_id;
137 unsigned int width, height;
140 size_t point_size, num_points;
155 size_t data_size = point_size * num_points;
159 std::stringstream
name;
161 auto uploader = gridfs.open_upload_stream(
name.str());
162 uploader.write(
static_cast<uint8_t *
>(point_data), data_size);
163 auto result = uploader.close();
164 using namespace bsoncxx::builder;
165 basic::document document;
166 document.append(basic::kvp(
"timestamp",
static_cast<int64_t
>(time.
in_msec())));
167 document.append(basic::kvp(
"pointcloud", [&](basic::sub_document subdoc) {
168 subdoc.append(basic::kvp(
"frame_id", frame_id));
169 subdoc.append(basic::kvp(
"is_dense", is_dense));
170 subdoc.append(basic::kvp(
"width",
static_cast<int64_t
>(width)));
171 subdoc.append(basic::kvp(
"height",
static_cast<int64_t
>(height)));
172 subdoc.append(basic::kvp(
"point_size",
static_cast<int64_t
>(point_size)));
173 subdoc.append(basic::kvp(
"num_points",
static_cast<int64_t
>(num_points)));
174 subdoc.append(basic::kvp(
"data", [&](basic::sub_document datadoc) {
175 datadoc.append(basic::kvp(
"id", result.id()));
176 datadoc.append(basic::kvp(
"filename",
name.str()));
178 subdoc.append(basic::kvp(
"field_info", [fieldinfo](basic::sub_array fi_array) {
179 for (
auto fi : fieldinfo) {
180 basic::document fi_doc;
181 fi_doc.append(basic::kvp(
"name", fi.name));
182 fi_doc.append(basic::kvp(
"offset",
static_cast<int64_t
>(fi.offset)));
183 fi_doc.append(basic::kvp(
"datatype", fi.datatype));
184 fi_doc.append(basic::kvp(
"count",
static_cast<int64_t
>(fi.count)));
190 mongodb_client->database(database)[collection].insert_one(document.view());
191 }
catch (mongocxx::operation_exception &e) {
192 logger->
log_warn(this->
name(),
"Failed to insert into %s: %s", collection.c_str(), e.what());
Point cloud adapter class.
void get_data_and_info(const std::string &id, std::string &frame_id, bool &is_dense, unsigned int &width, unsigned int &height, fawkes::Time &time, V_PointFieldInfo &pfi, void **data_ptr, size_t &point_size, size_t &num_points)
Get data and info of point cloud.
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
PointCloudDBStoreThread()
Constructor.
virtual void loop()
Code to execute in the thread.
virtual ~PointCloudDBStoreThread()
Destructor.
virtual void init()
Initialize the thread.
virtual void finalize()
Finalize the thread.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Wake threads on receiving a blackboard message.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
Configuration * config
This is the Configuration member used to access the configuration.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Logger * logger
This is the Logger member used to access the logger.
Thread aspect to access MongoDB.
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
bool exists_pointcloud(const char *id)
Check if point cloud exists.
Thread class encapsulation of pthreads.
const char * name() const
Get name of thread.
A class for handling time.
long in_msec() const
Convert the stored time into milli-seconds.
Fawkes library namespace.