23 #include "mongodb_log_image_thread.h" 25 #include <core/threading/mutex_locker.h> 26 #include <fvutils/color/colorspaces.h> 27 #include <fvutils/ipc/shm_image.h> 28 #include <utils/time/wait.h> 31 #include <mongo/client/dbclient.h> 32 #include <mongo/client/gridfs.h> 37 using namespace firevision;
38 using namespace mongo;
68 cfg_storage_interval_ =
config->
get_float(
"/plugins/mongodb-log/images/storage-interval");
70 cfg_chunk_size_ = 2097152;
72 cfg_chunk_size_ =
config->
get_uint(
"/plugins/mongodb-log/images/chunk-size");
87 gridfs_ =
new GridFS(*mongodb_, database_);
91 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
107 std::map<std::string, ImageInfo>::iterator p;
108 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
109 delete p->second.img;
125 unsigned int num_stored = 0;
128 if (*now_ - last_update_ >= 5.0) {
129 *last_update_ = now_;
133 std::map<std::string, ImageInfo>::iterator p;
134 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
139 if ((imginfo.last_sent != cap_time)) {
140 BSONObjBuilder document;
141 imginfo.last_sent = cap_time;
142 document.append(
"timestamp", (
long long)cap_time.
in_msec());
144 BSONObjBuilder subb(document.subobjStart(
"image"));
145 subb.append(
"image_id", imginfo.img->image_id());
146 subb.append(
"width", imginfo.img->
width());
147 subb.append(
"height", imginfo.img->
height());
148 subb.append(
"colorspace", colorspace_to_string(imginfo.img->
colorspace()));
150 std::stringstream
name;
151 name << imginfo.topic_name <<
"_" << cap_time.
in_msec();
153 gridfs_->storeFile((
char *)imginfo.img->buffer(),
154 imginfo.img->data_size(),
158 collection_ = database_ +
"." + imginfo.topic_name;
160 mongodb_->insert(collection_, document.obj());
162 }
catch (mongo::DBException &e) {
164 "Failed to insert image %s into %s: %s",
165 imginfo.img->image_id(),
175 "Stored %u of %zu images in %.1f ms",
178 (loop_end - &loop_start) * 1000.);
183 MongoLogImagesThread::update_images()
185 std::set<std::string> missing_images;
186 std::set<std::string> unbacked_images;
187 get_sets(missing_images, unbacked_images);
189 if (!unbacked_images.empty()) {
190 std::set<std::string>::iterator i;
191 for (i = unbacked_images.begin(); i != unbacked_images.end(); ++i) {
193 "Shutting down MongoLog for no longer available image %s",
201 if (!missing_images.empty()) {
202 std::set<std::string>::iterator i;
203 for (i = missing_images.begin(); i != missing_images.end(); ++i) {
204 std::vector<std::string>::iterator f;
205 bool include = includes_.empty();
207 for (f = includes_.begin(); f != includes_.end(); ++f) {
208 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
215 for (f = excludes_.begin(); f != excludes_.end(); ++f) {
216 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
229 std::string topic_name = std::string(
"Images.") + *i;
231 while ((pos = topic_name.find_first_of(
" -", pos)) != std::string::npos) {
232 topic_name.replace(pos, 1,
"_");
237 imginfo.topic_name = topic_name;
245 MongoLogImagesThread::get_sets(std::set<std::string> &missing_images,
246 std::set<std::string> &unbacked_images)
248 std::set<std::string> published_images;
249 std::map<std::string, ImageInfo>::iterator p;
250 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
251 if (p->second.img->num_attached() > 1) {
252 published_images.insert(p->first);
256 std::set<std::string> image_buffers;
263 dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
265 image_buffers.insert(ih->
image_id());
271 missing_images.clear();
272 unbacked_images.clear();
274 std::set_difference(image_buffers.begin(),
276 published_images.begin(),
277 published_images.end(),
278 std::inserter(missing_images, missing_images.end()));
280 std::set_difference(published_images.begin(),
281 published_images.end(),
282 image_buffers.begin(),
284 std::inserter(unbacked_images, unbacked_images.end()));
void wait()
Wait until minimum loop time has been reached.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
std::optional< int64_t > height() const
Get height value.
MongoLogImagesThread()
Constructor.
Fawkes library namespace.
void unlock()
Unlock the mutex.
std::optional< int64_t > width() const
Get width value.
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
virtual ~MongoLogImagesThread()
Destructor.
A class for handling time.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
virtual void finalize()
Finalize the thread.
Logger * logger
This is the Logger member used to access the logger.
Clock * clock
By means of this member access to the clock is given.
long in_msec() const
Convert the stored time into milli-seconds.
virtual void init()
Initialize the thread.
Base class for exceptions in Fawkes.
Thread aspect to access MongoDB.
virtual void loop()
Code to execute in the thread.
Shared memory image buffer.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
const char * name() const
Get name of thread.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void mark_start()
Mark start of loop.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
std::optional< std::string > colorspace() const
Get colorspace value.
void lock()
Lock this mutex.
Time & stamp()
Set this time to the current time.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Mutex mutual exclusion lock.
ImageInfo representation for JSON transfer.
Configuration * config
This is the Configuration member used to access the configuration.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.