Fawkes API  Fawkes Development Version
mongodb_log_pcl_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_pcl_thread.cpp - Thread to log point clouds to MongoDB
4  *
5  * Created: Mon Nov 07 02:58:40 2011
6  * Copyright 2011-2017 Tim Niemueller [www.niemueller.de]
7  * 2012 Bastian Klingen
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "mongodb_log_pcl_thread.h"
24 
25 // Fawkes
26 #include <core/threading/mutex_locker.h>
27 #include <utils/time/wait.h>
28 
29 // from MongoDB
30 #include <mongo/client/dbclient.h>
31 #include <mongo/client/gridfs.h>
32 
33 #include <fnmatch.h>
34 #include <unistd.h>
35 
36 using namespace fawkes;
37 using namespace mongo;
38 
39 /** @class MongoLogPointCloudThread "mongodb_log_pcl_thread.h"
40  * Thread to store point clouds to MongoDB.
41  * @author Tim Niemueller
42  * @author Bastian Klingen
43  */
44 
45 /** Constructor. */
47 : Thread("MongoLogPointCloudThread", Thread::OPMODE_CONTINUOUS), MongoDBAspect("default")
48 {
50 }
51 
52 /** Destructor. */
54 {
55 }
56 
57 void
59 {
60  database_ = "fflog";
61  try {
62  database_ = config->get_string("/plugins/mongodb-log/database");
63  } catch (Exception &e) {
64  logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
65  }
66 
67  cfg_storage_interval_ = config->get_float("/plugins/mongodb-log/pointclouds/storage-interval");
68 
69  cfg_chunk_size_ = 2097152; // 2 MB
70  try {
71  cfg_chunk_size_ = config->get_uint("/plugins/mongodb-log/pointclouds/chunk-size");
72  } catch (Exception &e) {
73  } // ignored, use default
74  logger->log_info(name(), "Chunk size: %u", cfg_chunk_size_);
75 
76  cfg_flush_after_write_ = false;
77  try {
78  cfg_flush_after_write_ = config->get_uint("/plugins/mongodb-log/pointclouds/flush-after-write");
79  } catch (Exception &e) {
80  } // ignored, use default
81 
82  std::vector<std::string> includes;
83  try {
84  includes = config->get_strings("/plugins/mongodb-log/pointclouds/includes");
85  } catch (Exception &e) {
86  } // ignored, no include rules
87  std::vector<std::string> excludes;
88 
89  try {
90  excludes = config->get_strings("/plugins/mongodb-log/pointclouds/excludes");
91  } catch (Exception &e) {
92  } // ignored, no include rules
93 
94  mongodb_ = mongodb_client;
95  gridfs_ = new GridFS(*mongodb_, database_);
96  //gridfs_->setChunkSize(cfg_chunk_size_);
97 
98  adapter_ = new PointCloudAdapter(pcl_manager, logger);
99 
100  std::vector<std::string> pcls = pcl_manager->get_pointcloud_list();
101 
102  std::vector<std::string>::iterator p;
103  std::vector<std::string>::iterator f;
104  for (p = pcls.begin(); p != pcls.end(); ++p) {
105  bool include = includes.empty();
106  if (!include) {
107  for (f = includes.begin(); f != includes.end(); ++f) {
108  if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
109  logger->log_debug(name(), "Include match %s to %s", f->c_str(), p->c_str());
110  include = true;
111  break;
112  }
113  }
114  }
115  if (include) {
116  for (f = excludes.begin(); f != excludes.end(); ++f) {
117  if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
118  logger->log_debug(name(), "Exclude match %s to %s", f->c_str(), p->c_str());
119  include = false;
120  break;
121  }
122  }
123  }
124  if (!include) {
125  logger->log_info(name(), "Excluding point cloud %s", p->c_str());
126  continue;
127  }
128 
129  PointCloudInfo pi;
130 
131  std::string topic_name = std::string("PointClouds.") + *p;
132  size_t pos = 0;
133  while ((pos = topic_name.find_first_of(" -", pos)) != std::string::npos) {
134  topic_name.replace(pos, 1, "_");
135  pos = pos + 1;
136  }
137 
138  pi.topic_name = topic_name;
139 
140  logger->log_info(name(), "MongoLog of point cloud %s", p->c_str());
141 
142  std::string frame_id;
143  unsigned int width, height;
144  bool is_dense;
146  adapter_->get_info(*p, width, height, frame_id, is_dense, fieldinfo);
147  pi.msg.header.frame_id = frame_id;
148  pi.msg.width = width;
149  pi.msg.height = height;
150  pi.msg.is_dense = is_dense;
151  pi.msg.fields.clear();
152  pi.msg.fields.resize(fieldinfo.size());
153  for (unsigned int i = 0; i < fieldinfo.size(); ++i) {
154  pi.msg.fields[i].name = fieldinfo[i].name;
155  pi.msg.fields[i].offset = fieldinfo[i].offset;
156  pi.msg.fields[i].datatype = fieldinfo[i].datatype;
157  pi.msg.fields[i].count = fieldinfo[i].count;
158  }
159 
160  pcls_[*p] = pi;
161  }
162 
163  wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
164  mutex_ = new Mutex();
165 }
166 
167 bool
169 {
170  mutex_->lock();
171  return true;
172 }
173 
174 void
176 {
177  delete adapter_;
178  delete gridfs_;
179  delete wait_;
180  delete mutex_;
181 }
182 
183 void
185 {
186  MutexLocker lock(mutex_);
187  fawkes::Time loop_start(clock);
188  wait_->mark_start();
189  std::map<std::string, PointCloudInfo>::iterator p;
190  unsigned int num_stored = 0;
191  for (p = pcls_.begin(); p != pcls_.end(); ++p) {
192  PointCloudInfo &pi = p->second;
193  std::string frame_id;
194  unsigned int width, height;
195  void * point_data;
196  size_t point_size, num_points;
197  fawkes::Time time;
198  adapter_->get_data(
199  p->first, frame_id, width, height, time, &point_data, point_size, num_points);
200  size_t data_size = point_size * num_points;
201 
202  if (pi.last_sent != time) {
203  pi.last_sent = time;
204 
206 
207  BSONObjBuilder document;
208  document.append("timestamp", (long long)time.in_msec());
209  BSONObjBuilder subb(document.subobjStart("pointcloud"));
210  subb.append("frame_id", pi.msg.header.frame_id);
211  subb.append("is_dense", pi.msg.is_dense);
212  subb.append("width", width);
213  subb.append("height", height);
214  subb.append("point_size", (unsigned int)point_size);
215  subb.append("num_points", (unsigned int)num_points);
216 
217  std::stringstream name;
218  name << pi.topic_name << "_" << time.in_msec();
219  subb.append("data", gridfs_->storeFile((char *)point_data, data_size, name.str()));
220 
221  BSONArrayBuilder subb2(subb.subarrayStart("field_info"));
222  for (unsigned int i = 0; i < pi.msg.fields.size(); i++) {
223  BSONObjBuilder fi(subb2.subobjStart());
224  fi.append("name", pi.msg.fields[i].name);
225  fi.append("offset", pi.msg.fields[i].offset);
226  fi.append("datatype", pi.msg.fields[i].datatype);
227  fi.append("count", pi.msg.fields[i].count);
228  fi.doneFast();
229  }
230  subb2.doneFast();
231  subb.doneFast();
232  collection_ = database_ + "." + pi.topic_name;
233  try {
234  mongodb_->insert(collection_, document.obj());
235  ++num_stored;
236  } catch (mongo::DBException &e) {
237  logger->log_warn(this->name(),
238  "Failed to insert into %s: %s",
239  collection_.c_str(),
240  e.what());
241  }
242 
243  fawkes::Time end(clock);
244  float diff = (end - &start) * 1000.;
245  logger->log_debug(this->name(),
246  "Stored point cloud %s (time %li) in %.1f ms",
247  p->first.c_str(),
248  time.in_msec(),
249  diff);
250 
251  } else {
252  logger->log_debug(this->name(), "Point cloud %s did not change", p->first.c_str());
253  //adapter_->close(p->first);
254  }
255  }
256  mutex_->unlock();
257  // -1 to subtract "NO PARENT" pseudo cache
258  fawkes::Time loop_end(clock);
259  logger->log_debug(name(),
260  "Stored %u of %zu point clouds in %.1f ms",
261  num_stored,
262  pcls_.size(),
263  (loop_end - &loop_start) * 1000.);
264 
265  if (cfg_flush_after_write_) {
266  // flush database
267  BSONObjBuilder flush_cmd;
268  BSONObj reply;
269  flush_cmd.append("fsync", 1);
270  flush_cmd.append("async", 1);
271  mongodb_client->runCommand("admin", flush_cmd.obj(), reply);
272  if (reply.hasField("ok")) {
273  if (!reply["ok"].trueValue()) {
274  logger->log_warn(name(), "fsync error: %s", reply["errmsg"].String().c_str());
275  }
276  } else {
277  logger->log_warn(name(), "fsync reply has no ok field");
278  }
279  }
280  wait_->wait();
281 }
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
Definition: pcl_adapter.h:66
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:78
std::vector< std::string > get_pointcloud_list() const
Get list of point cloud IDs.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
Mutex locking helper.
Definition: mutex_locker.h:33
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:55
A class for handling time.
Definition: time.h:92
Thread class encapsulation of pthreads.
Definition: thread.h:45
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:228
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
Base class for exceptions in Fawkes.
Definition: exception.h:35
Thread aspect to access MongoDB.
Definition: mongodb.h:39
const char * name() const
Get name of thread.
Definition: thread.h:100
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void get_info(const std::string &id, unsigned int &width, unsigned int &height, std::string &frame_id, bool &is_dense, V_PointFieldInfo &pfi)
Get info about point cloud.
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
Point cloud adapter class.
Definition: pcl_adapter.h:38
virtual ~MongoLogPointCloudThread()
Destructor.
void lock()
Lock this mutex.
Definition: mutex.cpp:87
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Mutex mutual exclusion lock.
Definition: mutex.h:32
virtual void finalize()
Finalize the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Time wait utility.
Definition: wait.h:32
virtual void loop()
Code to execute in the thread.
void get_data(const std::string &id, std::string &frame_id, unsigned int &width, unsigned int &height, fawkes::Time &time, void **data_ptr, size_t &point_size, size_t &num_points)
Get current data of point cloud.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:499
virtual void init()
Initialize the thread.