Fawkes API Fawkes Development Version
pcl_db_roscomm_thread.cpp
1
2/***************************************************************************
3 * pcl_db_merge_roscomm_thread.cpp - ROS communication for pcl-db-merge
4 *
5 * Created: Thu Dec 06 13:54:45 2012
6 * Copyright 2012-2018 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "pcl_db_roscomm_thread.h"
23
24#include <blackboard/utils/on_update_waker.h>
25#include <core/threading/wait_condition.h>
26#include <interfaces/PclDatabaseMergeInterface.h>
27#include <interfaces/PclDatabaseRetrieveInterface.h>
28#include <interfaces/PclDatabaseStoreInterface.h>
29#include <pcl/pcl_config.h>
30#include <ros/ros.h>
31#if PCL_VERSION_COMPARE(>=, 1, 7, 0)
32# include <pcl/PCLPointCloud2.h>
33# include <pcl_conversions/pcl_conversions.h>
34#endif
35
36using namespace fawkes;
37
38#define CFG_PREFIX "/perception/pcl-db-merge/ros/"
39
40/** @class PointCloudDBROSCommThread "pcl_db_roscomm_thread.h"
41 * Thread to merge point clouds from database on request.
42 * @author Tim Niemueller
43 */
44
45/** Constructor. */
47: Thread("PointCloudDBROSCommThread", Thread::OPMODE_WAITFORWAKEUP),
48 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE)
49{
50}
51
52/** Destructor. */
54{
55}
56
57void
59{
60 cfg_store_pcl_id_ = config->get_string("/perception/pcl-db-roscomm/store-pcl-id");
61
62 merge_if_ = blackboard->open_for_reading<PclDatabaseMergeInterface>("PCL Database Merge");
63
64 merge_waitcond_ = new WaitCondition();
65 merge_update_waker_ = new BlackBoardOnUpdateWaker(blackboard, merge_if_, this);
66
67 retrieve_if_ =
68 blackboard->open_for_reading<PclDatabaseRetrieveInterface>("PCL Database Retrieve");
69
70 retrieve_waitcond_ = new WaitCondition();
71 retrieve_update_waker_ = new BlackBoardOnUpdateWaker(blackboard, retrieve_if_, this);
72
73 store_if_ = blackboard->open_for_reading<PclDatabaseStoreInterface>("PCL Database Store");
74
75 store_waitcond_ = new WaitCondition();
76 store_update_waker_ = new BlackBoardOnUpdateWaker(blackboard, store_if_, this);
77
78 srv_merge_ = new ros::ServiceServer();
79 srv_retrieve_ = new ros::ServiceServer();
80 srv_record_ = new ros::ServiceServer();
81 srv_store_ = new ros::ServiceServer();
82
83 *srv_merge_ =
84 rosnode->advertiseService("/pcl_db/merge", &PointCloudDBROSCommThread::merge_cb, this);
85
86 *srv_retrieve_ =
87 rosnode->advertiseService("/pcl_db/retrieve", &PointCloudDBROSCommThread::retrieve_cb, this);
88
89 *srv_store_ =
90 rosnode->advertiseService("/pcl_db/store", &PointCloudDBROSCommThread::store_cb, this);
91}
92
93void
95{
96 srv_merge_->shutdown();
97 srv_retrieve_->shutdown();
98 srv_store_->shutdown();
99 srv_record_->shutdown();
100 delete srv_merge_;
101 delete srv_retrieve_;
102 delete srv_store_;
103 delete srv_record_;
104
105 delete merge_update_waker_;
106 delete retrieve_update_waker_;
107 delete store_update_waker_;
108 delete merge_waitcond_;
109 delete retrieve_waitcond_;
110 delete store_waitcond_;
111
112 blackboard->close(merge_if_);
113 blackboard->close(retrieve_if_);
114 blackboard->close(store_if_);
115}
116
117void
119{
120 merge_if_->read();
121 if (merge_if_->changed()) {
122 logger->log_info(name(), "Merge interface has changed");
123
125 "%u vs. %u final: %s",
126 merge_if_->msgid(),
127 merge_msg_id_,
128 merge_if_->is_final() ? "yes" : "no");
129
130 if ((merge_if_->msgid() == merge_msg_id_) && merge_if_->is_final()) {
131 logger->log_info(name(), "Merge final");
132 merge_waitcond_->wake_all();
133 }
134 }
135
136 retrieve_if_->read();
137 if (retrieve_if_->changed()) {
138 logger->log_info(name(), "Retrieve interface has changed");
139
141 "%u vs. %u final: %s",
142 retrieve_if_->msgid(),
143 retrieve_msg_id_,
144 retrieve_if_->is_final() ? "yes" : "no");
145
146 if ((retrieve_if_->msgid() == retrieve_msg_id_) && retrieve_if_->is_final()) {
147 logger->log_info(name(), "Retrieve final");
148 retrieve_waitcond_->wake_all();
149 }
150 }
151
152 store_if_->read();
153 if (store_if_->changed()) {
154 logger->log_info(name(), "Store interface has changed");
155
157 "%u vs. %u final: %s",
158 store_if_->msgid(),
159 store_msg_id_,
160 store_if_->is_final() ? "yes" : "no");
161
162 if ((store_if_->msgid() == store_msg_id_) && store_if_->is_final()) {
163 logger->log_info(name(), "Store final");
164 store_waitcond_->wake_all();
165 }
166 }
167}
168
169bool
170PointCloudDBROSCommThread::merge_cb(fawkes_msgs::MergePointClouds::Request & req,
171 fawkes_msgs::MergePointClouds::Response &resp)
172{
173 PclDatabaseMergeInterface::MergeMessage *mm = new PclDatabaseMergeInterface::MergeMessage();
174
175 if (req.timestamps.size() > mm->maxlenof_timestamps()) {
177 "Number of requested timestamps (%zu) "
178 "exceeds maximum number allowed (%zu)",
179 req.timestamps.size(),
180 mm->maxlenof_timestamps());
181 resp.ok = false;
182 resp.error = "Number of requested timestamps exceeds maximum number allowed";
183 delete mm;
184 return true;
185 }
186 if (req.timestamps.empty()) {
187 logger->log_warn(name(), "No times given in request");
188 resp.ok = false;
189 resp.error = "No times given in request";
190 delete mm;
191 return true;
192 }
193
194 size_t num_timestamps = mm->maxlenof_timestamps();
195 std::vector<int64_t> timestamps(num_timestamps, 0);
196 for (size_t i = 0; i < req.timestamps.size(); ++i) {
197 timestamps[i] =
198 (int64_t)req.timestamps[i].data.sec * 1000L + (int64_t)req.timestamps[i].data.nsec / 1000000L;
199 }
200 sort(timestamps.begin(), timestamps.begin() + req.timestamps.size());
201 mm->set_timestamps(&timestamps[0]);
202 mm->set_database(req.database.c_str());
203 mm->set_collection(req.collection.c_str());
204
205 mm->ref();
206 merge_if_->msgq_enqueue(mm);
207 merge_msg_id_ = mm->id();
208 mm->unref();
209
210 // wait for result
211 merge_waitcond_->wait();
212
213 // Check result
214 merge_if_->read();
215 if (merge_if_->is_final() && (std::string("") == merge_if_->error())) {
216 resp.ok = true;
217 } else {
218 resp.ok = false;
219 resp.error = merge_if_->error();
220 }
221 return true;
222}
223
224bool
225PointCloudDBROSCommThread::retrieve_cb(fawkes_msgs::RetrievePointCloud::Request & req,
226 fawkes_msgs::RetrievePointCloud::Response &resp)
227{
228 PclDatabaseRetrieveInterface::RetrieveMessage *mm =
229 new PclDatabaseRetrieveInterface::RetrieveMessage();
230
231 int64_t timestamp = (int64_t)req.timestamp.sec * 1000L + (int64_t)req.timestamp.nsec / 1000000L;
232
234 name(), "Restoring %li from %s.%s", timestamp, req.database.c_str(), req.collection.c_str());
235
236 mm->set_timestamp(timestamp);
237 mm->set_database(req.database.c_str());
238 mm->set_collection(req.collection.c_str());
239 mm->set_target_frame(req.target_frame.c_str());
240 mm->set_original_timestamp(req.original_timestamp);
241
242 mm->ref();
243 retrieve_if_->msgq_enqueue(mm);
244 retrieve_msg_id_ = mm->id();
245 mm->unref();
246
247 // wait for result
248 retrieve_waitcond_->wait();
249
250 // Check result
251 retrieve_if_->read();
252 if (retrieve_if_->is_final() && (std::string("") == retrieve_if_->error())) {
253 resp.ok = true;
254 } else {
255 resp.ok = false;
256 resp.error = retrieve_if_->error();
257 }
258 return true;
259}
260
261bool
262PointCloudDBROSCommThread::store_cb(fawkes_msgs::StorePointCloud::Request & req,
263 fawkes_msgs::StorePointCloud::Response &resp)
264{
265#if PCL_VERSION_COMPARE(>=, 1, 7, 0)
266 PclDatabaseStoreInterface::StoreMessage *mm = new PclDatabaseStoreInterface::StoreMessage();
267
269 "Storing to %s.%s",
270 (req.database == "") ? "<default>" : req.database.c_str(),
271 (req.collection == "") ? "<default>" : req.collection.c_str());
272
273 pcl::PCLPointCloud2 pcl_in;
274 pcl_conversions::moveToPCL(req.pointcloud, pcl_in);
275
278
279 std::string fields_in = pcl::getFieldsList(pcl_in);
280 std::string fields_xyz = pcl::getFieldsList(**pcl_xyz);
281 std::string fields_xyzrgb = pcl::getFieldsList(**pcl_xyzrgb);
282
283 if (fields_in == fields_xyz) {
284 pcl::fromPCLPointCloud2(pcl_in, **pcl_xyz);
285 pcl_manager->add_pointcloud(cfg_store_pcl_id_.c_str(), pcl_xyz);
286
287 } else if (fields_in == fields_xyzrgb) {
288 pcl::fromPCLPointCloud2(pcl_in, **pcl_xyzrgb);
289 pcl_manager->add_pointcloud(cfg_store_pcl_id_.c_str(), pcl_xyzrgb);
290
291 } else {
292 resp.ok = false;
293 resp.error = "Unsupported point cloud type";
294 }
295
296 mm->set_pcl_id(cfg_store_pcl_id_.c_str());
297 mm->set_database(req.database.c_str());
298 mm->set_collection(req.collection.c_str());
299
300 mm->ref();
301 store_if_->msgq_enqueue(mm);
302 store_msg_id_ = mm->id();
303 mm->unref();
304
305 // wait for result
306 store_waitcond_->wait();
307
308 pcl_manager->remove_pointcloud(cfg_store_pcl_id_.c_str());
309
310 // Check result
311 store_if_->read();
312 if (store_if_->is_final() && (std::string("") == store_if_->error())) {
313 resp.ok = true;
314 } else {
315 resp.ok = false;
316 resp.error = store_if_->error();
317 }
318 return true;
319#else
320 logger->log_warn(name(), "Cannot store point clouds, PCL < 1.7.0");
321 resp.ok = false;
322 resp.error = "Storing only supported with PCL >= 1.7.0";
323 return true;
324#endif
325}
326
327bool
328PointCloudDBROSCommThread::record_cb(fawkes_msgs::RecordData::Request & req,
329 fawkes_msgs::RecordData::Response &resp)
330{
331 logger->log_info(name(), "Recording ordered for %f sec", req.range.toSec());
332 ros::Time begin = ros::Time::now();
333 ros::Time end = begin + req.range;
334 ros::Time::sleepUntil(end);
335 resp.begin = begin;
336 resp.end = end;
337 resp.ok = true;
338 logger->log_info(name(), "Recording done");
339 return true;
340}
virtual void loop()
Code to execute in the thread.
virtual void init()
Initialize the thread.
virtual ~PointCloudDBROSCommThread()
Destructor.
virtual void finalize()
Finalize the thread.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Wake threads when a blackboard interface is updated.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void close(Interface *interface)=0
Close interface.
Thread aspect to use blocked timing.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
void remove_pointcloud(const char *id)
Remove the point cloud.
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT > > cloud)
Add point cloud.
LockPtr< ros::NodeHandle > rosnode
Central ROS node handle.
Definition: ros.h:47
RefPtr<> is a reference-counting shared smartpointer.
Definition: refptr.h:50
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
Fawkes library namespace.