23#include "mongodb_log_image_thread.h"
25#include <core/threading/mutex_locker.h>
26#include <fvutils/color/colorspaces.h>
27#include <fvutils/ipc/shm_image.h>
28#include <utils/time/wait.h>
31#include <bsoncxx/builder/basic/document.hpp>
33#include <mongocxx/client.hpp>
34#include <mongocxx/exception/operation_exception.hpp>
35#include <mongocxx/gridfs/uploader.hpp>
38using namespace firevision;
39using namespace mongocxx;
69 cfg_storage_interval_ =
config->
get_float(
"/plugins/mongodb-log/images/storage-interval");
71 cfg_chunk_size_ = 2097152;
73 cfg_chunk_size_ =
config->
get_uint(
"/plugins/mongodb-log/images/chunk-size");
88 gridfs_ = mongodb_->database(database_).gridfs_bucket();
92 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
108 std::map<std::string, ImageInfo>::iterator p;
109 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
110 delete p->second.img;
125 unsigned int num_stored = 0;
128 if (*now_ - last_update_ >= 5.0) {
129 *last_update_ = now_;
133 std::map<std::string, ImageInfo>::iterator p;
134 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
139 if ((imginfo.last_sent != cap_time)) {
140 using namespace bsoncxx::builder;
141 basic::document document;
142 imginfo.last_sent = cap_time;
143 document.append(basic::kvp(
"timestamp",
static_cast<int64_t
>(cap_time.
in_msec())));
145 document.append(basic::kvp(
"image", [&](basic::sub_document subdoc) {
146 subdoc.append(basic::kvp(
"image_id", imginfo.img->image_id()));
147 subdoc.append(basic::kvp(
"width",
static_cast<int32_t
>(imginfo.img->
width())));
148 subdoc.append(basic::kvp(
"height",
static_cast<int32_t
>(imginfo.img->
height())));
149 subdoc.append(basic::kvp(
"colorspace", colorspace_to_string(imginfo.img->
colorspace())));
151 std::stringstream
name;
152 name << imginfo.topic_name <<
"_" << cap_time.
in_msec();
153 auto uploader = gridfs_.open_upload_stream(
name.str());
154 uploader.write((uint8_t *)imginfo.img->buffer(), imginfo.img->data_size());
155 auto result = uploader.close();
156 subdoc.append(basic::kvp(
"data", [&](basic::sub_document subdoc) {
157 subdoc.append(basic::kvp(
"id", result.id()));
158 subdoc.append(basic::kvp(
"filename", name.str()));
163 mongodb_->database(database_)[imginfo.topic_name].insert_one(document.view());
165 }
catch (operation_exception &e) {
167 "Failed to insert image %s into %s.%s: %s",
168 imginfo.img->image_id(),
170 imginfo.topic_name.c_str(),
179 "Stored %u of %zu images in %.1f ms",
182 (loop_end - &loop_start) * 1000.);
187MongoLogImagesThread::update_images()
189 std::set<std::string> missing_images;
190 std::set<std::string> unbacked_images;
191 get_sets(missing_images, unbacked_images);
193 if (!unbacked_images.empty()) {
194 std::set<std::string>::iterator i;
195 for (i = unbacked_images.begin(); i != unbacked_images.end(); ++i) {
197 "Shutting down MongoLog for no longer available image %s",
205 if (!missing_images.empty()) {
206 std::set<std::string>::iterator i;
207 for (i = missing_images.begin(); i != missing_images.end(); ++i) {
208 std::vector<std::string>::iterator f;
209 bool include = includes_.empty();
211 for (f = includes_.begin(); f != includes_.end(); ++f) {
212 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
219 for (f = excludes_.begin(); f != excludes_.end(); ++f) {
220 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
233 std::string topic_name = std::string(
"Images.") + *i;
235 while ((pos = topic_name.find_first_of(
" -", pos)) != std::string::npos) {
236 topic_name.replace(pos, 1,
"_");
241 imginfo.topic_name = topic_name;
249MongoLogImagesThread::get_sets(std::set<std::string> &missing_images,
250 std::set<std::string> &unbacked_images)
252 std::set<std::string> published_images;
253 std::map<std::string, ImageInfo>::iterator p;
254 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
255 if (p->second.img->num_attached() > 1) {
256 published_images.insert(p->first);
260 std::set<std::string> image_buffers;
269 image_buffers.insert(ih->
image_id());
275 missing_images.clear();
276 unbacked_images.clear();
278 std::set_difference(image_buffers.begin(),
280 published_images.begin(),
281 published_images.end(),
282 std::inserter(missing_images, missing_images.end()));
284 std::set_difference(published_images.begin(),
285 published_images.end(),
286 image_buffers.begin(),
288 std::inserter(unbacked_images, unbacked_images.end()));
ImageInfo representation for JSON transfer.
std::optional< std::string > colorspace() const
Get colorspace value.
std::optional< int64_t > width() const
Get width value.
std::optional< int64_t > height() const
Get height value.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
MongoLogImagesThread()
Constructor.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
virtual ~MongoLogImagesThread()
Destructor.
virtual void loop()
Code to execute in the thread.
Clock * clock
By means of this member access to the clock is given.
Configuration * config
This is the Configuration member used to access the configuration.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Thread aspect to access MongoDB.
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Mutex mutual exclusion lock.
void lock()
Lock this mutex.
void unlock()
Unlock the mutex.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
const char * name() const
Get name of thread.
void mark_start()
Mark start of loop.
void wait()
Wait until minimum loop time has been reached.
A class for handling time.
Time & stamp()
Set this time to the current time.
long in_msec() const
Convert the stored time into milli-seconds.
Shared memory image buffer.
Fawkes library namespace.