22#include "pcl_db_merge_thread.h"
24#include <blackboard/utils/on_message_waker.h>
25#include <interfaces/PclDatabaseMergeInterface.h>
58 foutput_->is_dense =
false;
60 output_ = pcl_utils::cloudptr_from_refptr(foutput_);
95 std::vector<long> times;
97 std::string collection;
100 if (merge_if_->msgq_empty())
103 if (PclDatabaseMergeInterface::MergeMessage *msg = merge_if_->msgq_first_safe(msg)) {
104 merge_if_->set_final(
false);
105 merge_if_->set_msgid(msg->id());
106 merge_if_->set_error(
"");
109 int64_t *timestamps = msg->timestamps();
110 for (
size_t i = 0; i < msg->maxlenof_timestamps(); ++i) {
111 if (timestamps[i] > 0) {
112 times.push_back(timestamps[i]);
115 database = (strcmp(msg->database(),
"") != 0) ? msg->database() : cfg_database_;
116 collection = msg->collection();
119 merge_if_->msgq_pop();
130 logger->
log_warn(
name(),
"Called for merge from %s, but no times given", collection.c_str());
131 merge_if_->set_final(
true);
132 merge_if_->set_error(
"Called for merge, but no non-zero times given");
137 logger->
log_info(
name(),
"Restoring from '%s' for the following times", collection.c_str());
138 for (
size_t i = 0; i < times.size(); ++i) {
142 ApplicabilityStatus st_xyz, st_xyzrgb;
144 std::vector<long long> ll_times(times.begin(), times.end());
145 pl_xyz_->
applicable(ll_times, database, collection);
146 if ((st_xyz = pl_xyz_->
applicable(ll_times, database, collection)) == APPLICABLE) {
147 pl_xyz_->
merge(times, database, collection);
149 pcl_utils::set_time(foutput_, now);
150 }
else if ((st_xyzrgb = pl_xyzrgb_->
applicable(ll_times, database, collection)) == APPLICABLE) {
151 pl_xyzrgb_->
merge(times, database, collection);
153 pcl_utils::set_time(foutput_, now);
158 merge_if_->set_error(
"Merge failed, see pcl-db-merge log");
161 merge_if_->set_final(
true);
void merge(std::vector< long > ×, std::string &database, std::string &collection)
Merge point clouds.
PointCloudDBMergeThread()
Constructor.
virtual ~PointCloudDBMergeThread()
Destructor.
virtual void loop()
Code to execute in the thread.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
ApplicabilityStatus applicable(std::vector< long long > ×, std::string &database, std::string &collection)
Check if this pipeline instance is suitable for the given times.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Wake threads on receiving a blackboard message.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Configuration * config
This is the Configuration member used to access the configuration.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Thread aspect to access MongoDB.
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
void remove_pointcloud(const char *id)
Remove the point cloud.
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT > > cloud)
Add point cloud.
void reset()
Reset pointer.
Thread class encapsulation of pthreads.
const char * name() const
Get name of thread.
A class for handling time.
Fawkes library namespace.