22 #include "mongodb_log_tf_thread.h" 24 #include <core/threading/mutex_locker.h> 25 #include <plugins/mongodb/aspect/mongodb_conncreator.h> 26 #include <tf/time_cache.h> 27 #include <utils/time/wait.h> 30 #include <mongo/client/dbclient.h> 34 using namespace mongo;
47 :
Thread(
"MongoLogTransformsThread",
Thread::OPMODE_CONTINUOUS),
71 collection_ =
config->
get_string(
"/plugins/mongodb-log/transforms/collection");
73 logger->
log_info(
name(),
"No transforms collection configured, using %s", collection_.c_str());
75 collection_ = database_ +
"." + collection_;
77 cfg_storage_interval_ =
config->
get_float(
"/plugins/mongodb-log/transforms/storage-interval");
79 if (cfg_storage_interval_ <= 0.) {
84 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
108 std::vector<fawkes::Time> tf_range_start;
109 std::vector<fawkes::Time> tf_range_end;
114 std::vector<tf::TimeCacheInterfacePtr> copies(caches.size(), tf::TimeCacheInterfacePtr());
116 const size_t n_caches = caches.size();
119 if (last_tf_range_end_.size() != n_caches) {
120 last_tf_range_end_.resize(n_caches,
fawkes::Time(0, 0));
123 unsigned int num_transforms = 0;
124 unsigned int num_upd_caches = 0;
126 for (
size_t i = 0; i < n_caches; ++i) {
128 tf_range_end[i] = caches[i]->get_latest_timestamp();
129 if (last_tf_range_end_[i] != tf_range_end[i]) {
131 if (!tf_range_end[i].is_zero()) {
132 tf_range_start[i] = tf_range_end[i] - cfg_storage_interval_;
133 if (last_tf_range_end_[i] > tf_range_start[i]) {
134 tf_range_start[i] = last_tf_range_end_[i];
137 copies[i] = caches[i]->clone(tf_range_start[i]);
138 last_tf_range_end_[i] = tf_range_end[i];
140 num_transforms += copies[i]->get_list_length();
146 store(copies, tf_range_start, tf_range_end);
152 "%u transforms for %u updated frames stored in %.1f ms",
155 (loop_end - &loop_start) * 1000.);
160 MongoLogTransformsThread::store(std::vector<tf::TimeCacheInterfacePtr> &caches,
161 std::vector<fawkes::Time> & from,
162 std::vector<fawkes::Time> & to)
166 for (
size_t i = 0; i < caches.size(); ++i) {
167 tf::TimeCacheInterfacePtr tc = caches[i];
171 BSONObjBuilder document;
172 document.append(
"timestamp", (
long long)from[i].in_msec());
173 document.append(
"timestamp_from", (
long long)from[i].in_msec());
174 document.append(
"timestamp_to", (
long long)to[i].in_msec());
175 const tf::TimeCache::L_TransformStorage &storage = tc->get_storage();
177 if (storage.empty()) {
187 document.append(
"frame", frame_map[storage.front().frame_id]);
188 document.append(
"child_frame", frame_map[storage.front().child_frame_id]);
190 BSONArrayBuilder tfl_array(document.subarrayStart(
"transforms"));
196 tf::TimeCache::L_TransformStorage::const_iterator s;
197 for (s = storage.begin(); s != storage.end(); ++s) {
198 BSONObjBuilder tf_doc(tfl_array.subobjStart());
216 tf_doc.append(
"timestamp", (
long long)s->stamp.in_msec());
217 tf_doc.append(
"frame", frame_map[s->frame_id]);
218 tf_doc.append(
"child_frame", frame_map[s->child_frame_id]);
220 BSONArrayBuilder rot_arr(tf_doc.subarrayStart(
"rotation"));
221 rot_arr.append(s->rotation.x());
222 rot_arr.append(s->rotation.y());
223 rot_arr.append(s->rotation.z());
224 rot_arr.append(s->rotation.w());
227 BSONArrayBuilder trans_arr(tf_doc.subarrayStart(
"translation"));
228 trans_arr.append(s->translation.x());
229 trans_arr.append(s->translation.y());
230 trans_arr.append(s->translation.z());
231 trans_arr.doneFast();
236 tfl_array.doneFast();
240 }
catch (mongo::DBException &e) {
243 }
catch (std::exception &e) {
void wait()
Wait until minimum loop time has been reached.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
void unlock()
Unlock the mutex.
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
A class for handling time.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Logger * logger
This is the Logger member used to access the logger.
Clock * clock
By means of this member access to the clock is given.
Base class for exceptions in Fawkes.
Thread aspect to access MongoDB.
const char * name() const
Get name of thread.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void mark_start()
Mark start of loop.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
void lock()
Lock this mutex.
Mutex mutual exclusion lock.
Configuration * config
This is the Configuration member used to access the configuration.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.