Fawkes API  Fawkes Development Version
pcl_db_merge_thread.cpp
1 
2 /***************************************************************************
3  * pcl_db_merge_thread.cpp - Restore and merge point clouds from MongoDB
4  *
5  * Created: Wed Nov 28 10:56:10 2012 (Freiburg)
6  * Copyright 2012 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "pcl_db_merge_thread.h"
23 
24 #include <blackboard/utils/on_message_waker.h>
25 #include <interfaces/PclDatabaseMergeInterface.h>
26 
27 using namespace fawkes;
28 
29 /** @class PointCloudDBMergeThread "pcl_db_merge_thread.h"
30  * Thread to merge point clouds from database on request.
31  * @author Tim Niemueller
32  */
33 
34 /** Constructor. */
36 : Thread("PointCloudDBMergeThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
37 {
38 }
39 
40 /** Destructor. */
42 {
43 }
44 
45 void
47 {
48  pl_xyz_ = NULL;
49  pl_xyzrgb_ = NULL;
50  merge_if_ = NULL;
51  msg_waker_ = NULL;
52 
53  cfg_database_ = config->get_string(CFG_PREFIX "database-name");
54  cfg_output_id_ = config->get_string(CFG_PREFIX_MERGE "output-pcl-id");
55 
56  foutput_ = new pcl::PointCloud<pcl::PointXYZRGB>();
57  //foutput_->header.frame_id = finput_->header.frame_id;
58  foutput_->is_dense = false;
59  pcl_manager->add_pointcloud<pcl::PointXYZRGB>(cfg_output_id_.c_str(), foutput_);
60  output_ = pcl_utils::cloudptr_from_refptr(foutput_);
61 
64 
67 
68  try {
69  merge_if_ = blackboard->open_for_writing<PclDatabaseMergeInterface>("PCL Database Merge");
70 
71  msg_waker_ = new BlackBoardOnMessageWaker(blackboard, merge_if_, this);
72  } catch (Exception &e) {
73  finalize();
74  throw;
75  }
76 }
77 
78 void
80 {
81  delete msg_waker_;
82  blackboard->close(merge_if_);
83 
84  delete pl_xyz_;
85  delete pl_xyzrgb_;
86 
87  output_.reset();
88  pcl_manager->remove_pointcloud(cfg_output_id_.c_str());
89  foutput_.reset();
90 }
91 
92 void
94 {
95  std::vector<long long> times;
96  std::string database;
97  std::string collection;
98  //= "PointClouds.openni_pointcloud_xyz";
99 
100  if (merge_if_->msgq_empty())
101  return;
102 
103  if (PclDatabaseMergeInterface::MergeMessage *msg = merge_if_->msgq_first_safe(msg)) {
104  merge_if_->set_final(false);
105  merge_if_->set_msgid(msg->id());
106  merge_if_->set_error("");
107  merge_if_->write();
108 
109  int64_t *timestamps = msg->timestamps();
110  for (size_t i = 0; i < msg->maxlenof_timestamps(); ++i) {
111  if (timestamps[i] > 0) {
112  times.push_back(timestamps[i]);
113  }
114  }
115  database = (strcmp(msg->database(), "") != 0) ? msg->database() : cfg_database_;
116  collection = msg->collection();
117  }
118 
119  merge_if_->msgq_pop();
120 
121  /* For testing
122  collection = "PointClouds.openni_pointcloud_xyz";
123  times.clear();
124  times.push_back(1354200347715);
125  times.push_back(1354200406578);
126  times.push_back(1354200473345);
127  */
128 
129  if (times.empty()) {
130  logger->log_warn(name(), "Called for merge from %s, but no times given", collection.c_str());
131  merge_if_->set_final(true);
132  merge_if_->set_error("Called for merge, but no non-zero times given");
133  merge_if_->write();
134  return;
135  }
136 
137  logger->log_info(name(), "Restoring from '%s' for the following times", collection.c_str());
138  for (size_t i = 0; i < times.size(); ++i) {
139  logger->log_info(name(), " %lli", times[i]);
140  }
141 
142  ApplicabilityStatus st_xyz, st_xyzrgb;
143 
144  pl_xyz_->applicable(times, database, collection);
145  if ((st_xyz = pl_xyz_->applicable(times, database, collection)) == APPLICABLE) {
146  pl_xyz_->merge(times, database, collection);
147  Time now(clock);
148  pcl_utils::set_time(foutput_, now);
149  } else if ((st_xyzrgb = pl_xyzrgb_->applicable(times, database, collection)) == APPLICABLE) {
150  pl_xyzrgb_->merge(times, database, collection);
151  Time now(clock);
152  pcl_utils::set_time(foutput_, now);
153  } else {
154  logger->log_warn(name(), "No applicable merging pipeline known:");
155  logger->log_warn(name(), " XYZ: %s", to_string(st_xyz));
156  logger->log_warn(name(), " XYZ/RGB: %s", to_string(st_xyzrgb));
157  merge_if_->set_error("Merge failed, see pcl-db-merge log");
158  }
159 
160  merge_if_->set_final(true);
161  merge_if_->write();
162 }
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
void remove_pointcloud(const char *id)
Remove the point cloud.
void merge(std::vector< long long > &times, std::string &database, std::string &collection)
Merge point clouds.
virtual void loop()
Code to execute in the thread.
Fawkes library namespace.
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:55
Wake threads on receiving a blackboard message.
A class for handling time.
Definition: time.h:92
Thread class encapsulation of pthreads.
Definition: thread.h:45
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
virtual void finalize()
Finalize the thread.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
void reset()
Reset pointer.
Definition: refptr.h:453
virtual ~PointCloudDBMergeThread()
Destructor.
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
Base class for exceptions in Fawkes.
Definition: exception.h:35
Thread aspect to access MongoDB.
Definition: mongodb.h:39
const char * name() const
Get name of thread.
Definition: thread.h:100
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT >> cloud)
Add point cloud.
tf::Transformer * tf_listener
This is the transform listener which saves transforms published by other threads in the system.
Definition: tf.h:67
virtual void init()
Initialize the thread.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
ApplicabilityStatus applicable(std::vector< long long > &times, std::string &database, std::string &collection)
Check if this pipeline instance is suitable for the given times.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void close(Interface *interface)=0
Close interface.