Fawkes API  Fawkes Development Version
pcl_db_store_thread.cpp
1 
2 /***************************************************************************
3  * pcl_db_store_thread.cpp - Store point clouds to MongoDB
4  *
5  * Created: Mon May 05 14:26:22 2014
6  * Copyright 2012-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "pcl_db_store_thread.h"
23 
24 #include <blackboard/utils/on_message_waker.h>
25 #include <interfaces/PclDatabaseStoreInterface.h>
26 #include <pcl_utils/pcl_adapter.h>
27 
28 // from MongoDB
29 #include <mongo/client/dbclient.h>
30 #include <mongo/client/gridfs.h>
31 
32 #define CFG_PREFIX "/perception/pcl-db/"
33 
34 using namespace fawkes;
35 using namespace mongo;
36 
37 /** @class PointCloudDBStoreThread "pcl_db_store_thread.h"
38  * Thread to store point clouds from database on request.
39  * @author Tim Niemueller
40  */
41 
42 /** Constructor. */
44 : Thread("PointCloudDBStoreThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
45 {
46 }
47 
48 /** Destructor. */
50 {
51 }
52 
53 void
55 {
56  store_if_ = NULL;
57  adapter_ = NULL;
58  msg_waker_ = NULL;
59 
60  cfg_database_ = config->get_string(CFG_PREFIX "database-name");
61 
62  adapter_ = new PointCloudAdapter(pcl_manager, logger);
63 
64  try {
65  store_if_ = blackboard->open_for_writing<PclDatabaseStoreInterface>("PCL Database Store");
66 
67  msg_waker_ = new BlackBoardOnMessageWaker(blackboard, store_if_, this);
68  } catch (Exception &e) {
69  finalize();
70  throw;
71  }
72 }
73 
74 void
76 {
77  delete msg_waker_;
78  blackboard->close(store_if_);
79  delete adapter_;
80 }
81 
82 void
84 {
85  if (store_if_->msgq_empty())
86  return;
87 
88  if (PclDatabaseStoreInterface::StoreMessage *msg = store_if_->msgq_first_safe(msg)) {
89  store_if_->set_final(false);
90  store_if_->set_msgid(msg->id());
91  store_if_->set_error("");
92  store_if_->write();
93 
94  std::string msg_database = msg->database();
95  std::string msg_collection = msg->collection();
96 
97  bool store = true;
98  std::string errmsg;
99  std::string database = (msg_database != "") ? msg_database : cfg_database_;
100  std::string collection = database + ".";
101  if (msg_collection == "") {
102  collection += "pcls";
103  } else if (msg_collection.compare(0, 3, "fs.") == 0) {
104  errmsg = "Passed in collection uses GridFS namespace";
105  store = false;
106  } else {
107  collection += msg->collection();
108  }
109 
110  if (store)
111  store_pointcloud(msg->pcl_id(), database, collection, errmsg);
112 
113  store_if_->set_error(errmsg.c_str());
114  store_if_->set_final(true);
115  store_if_->write();
116  } else {
117  logger->log_warn(name(), "Unhandled message received");
118  }
119  store_if_->msgq_pop();
120 }
121 
122 bool
123 PointCloudDBStoreThread::store_pointcloud(std::string pcl_id,
124  std::string database,
125  std::string collection,
126  std::string &errmsg)
127 {
128  if (!pcl_manager->exists_pointcloud(pcl_id.c_str())) {
129  errmsg = "PointCloud does not exist";
130  return false;
131  }
132 
133  std::string frame_id;
134  unsigned int width, height;
135  bool is_dense;
136  void * point_data;
137  size_t point_size, num_points;
138  fawkes::Time time;
140 
141  adapter_->get_data_and_info(pcl_id,
142  frame_id,
143  is_dense,
144  width,
145  height,
146  time,
147  fieldinfo,
148  &point_data,
149  point_size,
150  num_points);
151 
152  size_t data_size = point_size * num_points;
153 
154  BSONObjBuilder document;
155  document.append("timestamp", (long long)time.in_msec());
156  BSONObjBuilder subb(document.subobjStart("pointcloud"));
157  subb.append("frame_id", frame_id);
158  subb.append("is_dense", is_dense);
159  subb.append("width", width);
160  subb.append("height", height);
161  subb.append("point_size", (unsigned int)point_size);
162  subb.append("num_points", (unsigned int)num_points);
163 
164  GridFS gridfs(*mongodb_client, database);
165 
166  std::stringstream name;
167  name << "pcl_" << time.in_msec();
168  subb.append("data", gridfs.storeFile((char *)point_data, data_size, name.str()));
169 
170  BSONArrayBuilder subb2(subb.subarrayStart("field_info"));
171  for (unsigned int i = 0; i < fieldinfo.size(); i++) {
172  BSONObjBuilder fi(subb2.subobjStart());
173  fi.append("name", fieldinfo[i].name);
174  fi.append("offset", fieldinfo[i].offset);
175  fi.append("datatype", fieldinfo[i].datatype);
176  fi.append("count", fieldinfo[i].count);
177  fi.doneFast();
178  }
179  subb2.doneFast();
180  subb.doneFast();
181  try {
182  mongodb_client->insert(collection, document.obj());
183  } catch (mongo::DBException &e) {
184  logger->log_warn(this->name(), "Failed to insert into %s: %s", collection.c_str(), e.what());
185  errmsg = e.what();
186  return false;
187  }
188 
189  return true;
190 }
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
Definition: pcl_adapter.h:66
virtual void loop()
Code to execute in the thread.
Fawkes library namespace.
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:55
Wake threads on receiving a blackboard message.
A class for handling time.
Definition: time.h:92
Thread class encapsulation of pthreads.
Definition: thread.h:45
virtual void init()
Initialize the thread.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
void get_data_and_info(const std::string &id, std::string &frame_id, bool &is_dense, unsigned int &width, unsigned int &height, fawkes::Time &time, V_PointFieldInfo &pfi, void **data_ptr, size_t &point_size, size_t &num_points)
Get data and info of point cloud.
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:228
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
Base class for exceptions in Fawkes.
Definition: exception.h:35
Thread aspect to access MongoDB.
Definition: mongodb.h:39
const char * name() const
Get name of thread.
Definition: thread.h:100
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Point cloud adapter class.
Definition: pcl_adapter.h:38
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual ~PointCloudDBStoreThread()
Destructor.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual void finalize()
Finalize the thread.
bool exists_pointcloud(const char *id)
Check if point cloud exists.
virtual void close(Interface *interface)=0
Close interface.