22#include "pcl_db_roscomm_thread.h"
24#include <blackboard/utils/on_update_waker.h>
25#include <core/threading/wait_condition.h>
26#include <interfaces/PclDatabaseMergeInterface.h>
27#include <interfaces/PclDatabaseRetrieveInterface.h>
28#include <interfaces/PclDatabaseStoreInterface.h>
29#include <pcl/pcl_config.h>
31#if PCL_VERSION_COMPARE(>=, 1, 7, 0)
32# include <pcl/PCLPointCloud2.h>
33# include <pcl_conversions/pcl_conversions.h>
38#define CFG_PREFIX "/perception/pcl-db-merge/ros/"
47:
Thread(
"PointCloudDBROSCommThread",
Thread::OPMODE_WAITFORWAKEUP),
60 cfg_store_pcl_id_ =
config->
get_string(
"/perception/pcl-db-roscomm/store-pcl-id");
78 srv_merge_ =
new ros::ServiceServer();
79 srv_retrieve_ =
new ros::ServiceServer();
80 srv_record_ =
new ros::ServiceServer();
81 srv_store_ =
new ros::ServiceServer();
84 rosnode->advertiseService(
"/pcl_db/merge", &PointCloudDBROSCommThread::merge_cb,
this);
87 rosnode->advertiseService(
"/pcl_db/retrieve", &PointCloudDBROSCommThread::retrieve_cb,
this);
90 rosnode->advertiseService(
"/pcl_db/store", &PointCloudDBROSCommThread::store_cb,
this);
96 srv_merge_->shutdown();
97 srv_retrieve_->shutdown();
98 srv_store_->shutdown();
99 srv_record_->shutdown();
101 delete srv_retrieve_;
105 delete merge_update_waker_;
106 delete retrieve_update_waker_;
107 delete store_update_waker_;
108 delete merge_waitcond_;
109 delete retrieve_waitcond_;
110 delete store_waitcond_;
121 if (merge_if_->changed()) {
125 "%u vs. %u final: %s",
128 merge_if_->is_final() ?
"yes" :
"no");
130 if ((merge_if_->msgid() == merge_msg_id_) && merge_if_->is_final()) {
136 retrieve_if_->read();
137 if (retrieve_if_->changed()) {
141 "%u vs. %u final: %s",
142 retrieve_if_->msgid(),
144 retrieve_if_->is_final() ?
"yes" :
"no");
146 if ((retrieve_if_->msgid() == retrieve_msg_id_) && retrieve_if_->is_final()) {
153 if (store_if_->changed()) {
157 "%u vs. %u final: %s",
160 store_if_->is_final() ?
"yes" :
"no");
162 if ((store_if_->msgid() == store_msg_id_) && store_if_->is_final()) {
170PointCloudDBROSCommThread::merge_cb(fawkes_msgs::MergePointClouds::Request & req,
171 fawkes_msgs::MergePointClouds::Response &resp)
173 PclDatabaseMergeInterface::MergeMessage *mm =
new PclDatabaseMergeInterface::MergeMessage();
175 if (req.timestamps.size() > mm->maxlenof_timestamps()) {
177 "Number of requested timestamps (%zu) "
178 "exceeds maximum number allowed (%zu)",
179 req.timestamps.size(),
180 mm->maxlenof_timestamps());
182 resp.error =
"Number of requested timestamps exceeds maximum number allowed";
186 if (req.timestamps.empty()) {
189 resp.error =
"No times given in request";
194 size_t num_timestamps = mm->maxlenof_timestamps();
195 std::vector<int64_t> timestamps(num_timestamps, 0);
196 for (
size_t i = 0; i < req.timestamps.size(); ++i) {
198 (int64_t)req.timestamps[i].data.sec * 1000L + (int64_t)req.timestamps[i].data.nsec / 1000000L;
200 sort(timestamps.begin(), timestamps.begin() + req.timestamps.size());
201 mm->set_timestamps(×tamps[0]);
202 mm->set_database(req.database.c_str());
203 mm->set_collection(req.collection.c_str());
206 merge_if_->msgq_enqueue(mm);
207 merge_msg_id_ = mm->id();
211 merge_waitcond_->
wait();
215 if (merge_if_->is_final() && (std::string(
"") == merge_if_->error())) {
219 resp.error = merge_if_->error();
225PointCloudDBROSCommThread::retrieve_cb(fawkes_msgs::RetrievePointCloud::Request & req,
226 fawkes_msgs::RetrievePointCloud::Response &resp)
228 PclDatabaseRetrieveInterface::RetrieveMessage *mm =
229 new PclDatabaseRetrieveInterface::RetrieveMessage();
231 int64_t timestamp = (int64_t)req.timestamp.sec * 1000L + (int64_t)req.timestamp.nsec / 1000000L;
234 name(),
"Restoring %li from %s.%s", timestamp, req.database.c_str(), req.collection.c_str());
236 mm->set_timestamp(timestamp);
237 mm->set_database(req.database.c_str());
238 mm->set_collection(req.collection.c_str());
239 mm->set_target_frame(req.target_frame.c_str());
240 mm->set_original_timestamp(req.original_timestamp);
243 retrieve_if_->msgq_enqueue(mm);
244 retrieve_msg_id_ = mm->id();
248 retrieve_waitcond_->
wait();
251 retrieve_if_->read();
252 if (retrieve_if_->is_final() && (std::string(
"") == retrieve_if_->error())) {
256 resp.error = retrieve_if_->error();
262PointCloudDBROSCommThread::store_cb(fawkes_msgs::StorePointCloud::Request & req,
263 fawkes_msgs::StorePointCloud::Response &resp)
265#if PCL_VERSION_COMPARE(>=, 1, 7, 0)
266 PclDatabaseStoreInterface::StoreMessage *mm =
new PclDatabaseStoreInterface::StoreMessage();
270 (req.database ==
"") ?
"<default>" : req.database.c_str(),
271 (req.collection ==
"") ?
"<default>" : req.collection.c_str());
273 pcl::PCLPointCloud2 pcl_in;
274 pcl_conversions::moveToPCL(req.pointcloud, pcl_in);
279 std::string fields_in = pcl::getFieldsList(pcl_in);
280 std::string fields_xyz = pcl::getFieldsList(**pcl_xyz);
281 std::string fields_xyzrgb = pcl::getFieldsList(**pcl_xyzrgb);
283 if (fields_in == fields_xyz) {
284 pcl::fromPCLPointCloud2(pcl_in, **pcl_xyz);
287 }
else if (fields_in == fields_xyzrgb) {
288 pcl::fromPCLPointCloud2(pcl_in, **pcl_xyzrgb);
293 resp.error =
"Unsupported point cloud type";
296 mm->set_pcl_id(cfg_store_pcl_id_.c_str());
297 mm->set_database(req.database.c_str());
298 mm->set_collection(req.collection.c_str());
301 store_if_->msgq_enqueue(mm);
302 store_msg_id_ = mm->id();
306 store_waitcond_->
wait();
312 if (store_if_->is_final() && (std::string(
"") == store_if_->error())) {
316 resp.error = store_if_->error();
322 resp.error =
"Storing only supported with PCL >= 1.7.0";
328PointCloudDBROSCommThread::record_cb(fawkes_msgs::RecordData::Request & req,
329 fawkes_msgs::RecordData::Response &resp)
332 ros::Time begin = ros::Time::now();
333 ros::Time end = begin + req.range;
334 ros::Time::sleepUntil(end);
virtual void loop()
Code to execute in the thread.
virtual void init()
Initialize the thread.
virtual ~PointCloudDBROSCommThread()
Destructor.
virtual void finalize()
Finalize the thread.
PointCloudDBROSCommThread()
Constructor.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Wake threads when a blackboard interface is updated.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void close(Interface *interface)=0
Close interface.
Thread aspect to use blocked timing.
Configuration * config
This is the Configuration member used to access the configuration.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
void remove_pointcloud(const char *id)
Remove the point cloud.
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT > > cloud)
Add point cloud.
LockPtr< ros::NodeHandle > rosnode
Central ROS node handle.
RefPtr<> is a reference-counting shared smartpointer.
Thread class encapsulation of pthreads.
const char * name() const
Get name of thread.
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
Fawkes library namespace.