Fawkes API Fawkes Development Version
mongodb_log_pcl_thread.cpp
1
2/***************************************************************************
3 * mongodb_log_pcl_thread.cpp - Thread to log point clouds to MongoDB
4 *
5 * Created: Mon Nov 07 02:58:40 2011
6 * Copyright 2011-2017 Tim Niemueller [www.niemueller.de]
7 * 2012 Bastian Klingen
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL file in the doc directory.
21 */
22
23#include "mongodb_log_pcl_thread.h"
24
25// Fawkes
26#include <core/threading/mutex_locker.h>
27#include <utils/time/wait.h>
28
29// from MongoDB
30#include <fnmatch.h>
31#include <mongocxx/client.hpp>
32#include <mongocxx/exception/operation_exception.hpp>
33#include <mongocxx/gridfs/uploader.hpp>
34#include <unistd.h>
35
36using namespace fawkes;
37using namespace mongocxx;
38
39/** @class MongoLogPointCloudThread "mongodb_log_pcl_thread.h"
40 * Thread to store point clouds to MongoDB.
41 * @author Tim Niemueller
42 * @author Bastian Klingen
43 */
44
45/** Constructor. */
47: Thread("MongoLogPointCloudThread", Thread::OPMODE_CONTINUOUS), MongoDBAspect("default")
48{
50}
51
52/** Destructor. */
54{
55}
56
57void
59{
60 database_ = "fflog";
61 try {
62 database_ = config->get_string("/plugins/mongodb-log/database");
63 } catch (Exception &e) {
64 logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
65 }
66
67 cfg_storage_interval_ = config->get_float("/plugins/mongodb-log/pointclouds/storage-interval");
68
69 cfg_chunk_size_ = 2097152; // 2 MB
70 try {
71 cfg_chunk_size_ = config->get_uint("/plugins/mongodb-log/pointclouds/chunk-size");
72 } catch (Exception &e) {
73 } // ignored, use default
74 logger->log_info(name(), "Chunk size: %u", cfg_chunk_size_);
75
76 cfg_flush_after_write_ = false;
77 try {
78 cfg_flush_after_write_ = config->get_uint("/plugins/mongodb-log/pointclouds/flush-after-write");
79 } catch (Exception &e) {
80 } // ignored, use default
81
82 std::vector<std::string> includes;
83 try {
84 includes = config->get_strings("/plugins/mongodb-log/pointclouds/includes");
85 } catch (Exception &e) {
86 } // ignored, no include rules
87 std::vector<std::string> excludes;
88
89 try {
90 excludes = config->get_strings("/plugins/mongodb-log/pointclouds/excludes");
91 } catch (Exception &e) {
92 } // ignored, no include rules
93
94 mongodb_ = mongodb_client;
95 gridfs_ = mongodb_->database(database_).gridfs_bucket();
96 //gridfs_->setChunkSize(cfg_chunk_size_);
97
98 adapter_ = new PointCloudAdapter(pcl_manager, logger);
99
100 std::vector<std::string> pcls = pcl_manager->get_pointcloud_list();
101
102 std::vector<std::string>::iterator p;
103 std::vector<std::string>::iterator f;
104 for (p = pcls.begin(); p != pcls.end(); ++p) {
105 bool include = includes.empty();
106 if (!include) {
107 for (f = includes.begin(); f != includes.end(); ++f) {
108 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
109 logger->log_debug(name(), "Include match %s to %s", f->c_str(), p->c_str());
110 include = true;
111 break;
112 }
113 }
114 }
115 if (include) {
116 for (f = excludes.begin(); f != excludes.end(); ++f) {
117 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
118 logger->log_debug(name(), "Exclude match %s to %s", f->c_str(), p->c_str());
119 include = false;
120 break;
121 }
122 }
123 }
124 if (!include) {
125 logger->log_info(name(), "Excluding point cloud %s", p->c_str());
126 continue;
127 }
128
129 PointCloudInfo pi;
130
131 std::string topic_name = std::string("PointClouds.") + *p;
132 size_t pos = 0;
133 while ((pos = topic_name.find_first_of(" -", pos)) != std::string::npos) {
134 topic_name.replace(pos, 1, "_");
135 pos = pos + 1;
136 }
137
138 pi.topic_name = topic_name;
139
140 logger->log_info(name(), "MongoLog of point cloud %s", p->c_str());
141
142 std::string frame_id;
143 unsigned int width, height;
144 bool is_dense;
146 adapter_->get_info(*p, width, height, frame_id, is_dense, fieldinfo);
147 pi.msg.header.frame_id = frame_id;
148 pi.msg.width = width;
149 pi.msg.height = height;
150 pi.msg.is_dense = is_dense;
151 pi.msg.fields.clear();
152 pi.msg.fields.resize(fieldinfo.size());
153 for (unsigned int i = 0; i < fieldinfo.size(); ++i) {
154 pi.msg.fields[i].name = fieldinfo[i].name;
155 pi.msg.fields[i].offset = fieldinfo[i].offset;
156 pi.msg.fields[i].datatype = fieldinfo[i].datatype;
157 pi.msg.fields[i].count = fieldinfo[i].count;
158 }
159
160 pcls_[*p] = pi;
161 }
162
163 wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
164 mutex_ = new Mutex();
165}
166
167bool
169{
170 mutex_->lock();
171 return true;
172}
173
174void
176{
177 delete adapter_;
178 delete wait_;
179 delete mutex_;
180}
181
182void
184{
185 MutexLocker lock(mutex_);
186 fawkes::Time loop_start(clock);
187 wait_->mark_start();
188 std::map<std::string, PointCloudInfo>::iterator p;
189 unsigned int num_stored = 0;
190 for (p = pcls_.begin(); p != pcls_.end(); ++p) {
191 PointCloudInfo &pi = p->second;
192 std::string frame_id;
193 unsigned int width, height;
194 void * point_data;
195 size_t point_size, num_points;
196 fawkes::Time time;
197 adapter_->get_data(
198 p->first, frame_id, width, height, time, &point_data, point_size, num_points);
199 size_t data_size = point_size / sizeof(uint8_t) * num_points;
200
201 if (pi.last_sent != time) {
202 pi.last_sent = time;
203
205
206 using namespace bsoncxx::builder;
207 basic::document document;
208 std::stringstream name;
209 name << pi.topic_name << "_" << time.in_msec();
210 auto uploader = gridfs_.open_upload_stream(name.str());
211 uploader.write((uint8_t *)point_data, data_size);
212 auto result = uploader.close();
213 document.append(basic::kvp("timestamp", static_cast<int64_t>(time.in_msec())));
214 document.append(basic::kvp("pointcloud", [&](basic::sub_document subdoc) {
215 subdoc.append(basic::kvp("frame_id", pi.msg.header.frame_id));
216 subdoc.append(basic::kvp("is_dense", pi.msg.is_dense));
217 subdoc.append(basic::kvp("width", static_cast<int32_t>(width)));
218 subdoc.append(basic::kvp("height", static_cast<int32_t>(height)));
219 subdoc.append(basic::kvp("point_size", static_cast<int32_t>(point_size)));
220 subdoc.append(basic::kvp("num_points", static_cast<int32_t>(num_points)));
221 // TODO: We store the ID, is this correct?
222 subdoc.append(basic::kvp("data", result.id()));
223 subdoc.append(basic::kvp("field_info", [pi](basic::sub_array array) {
224 for (uint i = 0; i < pi.msg.fields.size(); i++) {
225 basic::document field_info_doc;
226 field_info_doc.append(basic::kvp("name", pi.msg.fields[i].name));
227 field_info_doc.append(
228 basic::kvp("offset", static_cast<int32_t>(pi.msg.fields[i].offset)));
229 field_info_doc.append(
230 basic::kvp("datatype", static_cast<int32_t>(pi.msg.fields[i].datatype)));
231 field_info_doc.append(
232 basic::kvp("count", static_cast<int32_t>(pi.msg.fields[i].count)));
233 array.append(field_info_doc);
234 }
235 }));
236 }));
237
238 try {
239 mongodb_->database(database_)[pi.topic_name].insert_one(document.view());
240 ++num_stored;
241 } catch (operation_exception &e) {
242 logger->log_warn(this->name(),
243 "Failed to insert into %s: %s",
244 collection_.c_str(),
245 e.what());
246 }
247
248 fawkes::Time end(clock);
249 float diff = (end - &start) * 1000.;
250 logger->log_debug(this->name(),
251 "Stored point cloud %s (time %li) in %.1f ms",
252 p->first.c_str(),
253 time.in_msec(),
254 diff);
255
256 } else {
257 logger->log_debug(this->name(), "Point cloud %s did not change", p->first.c_str());
258 //adapter_->close(p->first);
259 }
260 }
261 mutex_->unlock();
262 // -1 to subtract "NO PARENT" pseudo cache
263 fawkes::Time loop_end(clock);
264 logger->log_debug(name(),
265 "Stored %u of %zu point clouds in %.1f ms",
266 num_stored,
267 pcls_.size(),
268 (loop_end - &loop_start) * 1000.);
269
270 if (cfg_flush_after_write_) {
271 // flush database
272 using namespace bsoncxx::builder;
273 basic::document flush_cmd;
274 flush_cmd.append(basic::kvp("fsync", 1));
275 flush_cmd.append(basic::kvp("async", 1));
276 auto reply = mongodb_client->database("admin").run_command(flush_cmd.view());
277 if (reply.view()["ok"].get_double() != 1) {
278 logger->log_warn(name(),
279 "fsync error: %s",
280 reply.view()["errmsg"].get_utf8().value.to_string().c_str());
281 }
282 }
283 wait_->wait();
284}
virtual void init()
Initialize the thread.
virtual ~MongoLogPointCloudThread()
Destructor.
virtual void loop()
Code to execute in the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
virtual void finalize()
Finalize the thread.
Point cloud adapter class.
Definition: pcl_adapter.h:39
void get_data(const std::string &id, std::string &frame_id, unsigned int &width, unsigned int &height, fawkes::Time &time, void **data_ptr, size_t &point_size, size_t &num_points)
Get current data of point cloud.
void get_info(const std::string &id, unsigned int &width, unsigned int &height, std::string &frame_id, bool &is_dense, V_PointFieldInfo &pfi)
Get info about point cloud.
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
Definition: pcl_adapter.h:66
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access MongoDB.
Definition: mongodb.h:39
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:54
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Definition: multi.cpp:174
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
std::vector< std::string > get_pointcloud_list() const
Get list of point cloud IDs.
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:499
Time wait utility.
Definition: wait.h:33
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
A class for handling time.
Definition: time.h:93
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:228
Fawkes library namespace.