23 #include "mongodb_log_image_thread.h"
25 #include <core/threading/mutex_locker.h>
26 #include <fvutils/color/colorspaces.h>
27 #include <fvutils/ipc/shm_image.h>
28 #include <utils/time/wait.h>
31 #include <bsoncxx/builder/basic/document.hpp>
33 #include <mongocxx/client.hpp>
34 #include <mongocxx/exception/operation_exception.hpp>
35 #include <mongocxx/gridfs/uploader.hpp>
38 using namespace firevision;
39 using namespace mongocxx;
69 cfg_storage_interval_ =
config->
get_float(
"/plugins/mongodb-log/images/storage-interval");
71 cfg_chunk_size_ = 2097152;
73 cfg_chunk_size_ =
config->
get_uint(
"/plugins/mongodb-log/images/chunk-size");
88 gridfs_ = mongodb_->database(database_).gridfs_bucket();
92 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
108 std::map<std::string, ImageInfo>::iterator p;
109 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
110 delete p->second.img;
125 unsigned int num_stored = 0;
128 if (*now_ - last_update_ >= 5.0) {
129 *last_update_ = now_;
133 std::map<std::string, ImageInfo>::iterator p;
134 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
139 if ((imginfo.last_sent != cap_time)) {
140 using namespace bsoncxx::builder;
141 basic::document document;
142 imginfo.last_sent = cap_time;
143 document.append(basic::kvp(
"timestamp",
static_cast<int64_t
>(cap_time.
in_msec())));
145 document.append(basic::kvp(
"image", [&](basic::sub_document subdoc) {
146 subdoc.append(basic::kvp(
"image_id", imginfo.img->image_id()));
147 subdoc.append(basic::kvp(
"width",
static_cast<int32_t
>(imginfo.img->
width())));
148 subdoc.append(basic::kvp(
"height",
static_cast<int32_t
>(imginfo.img->
height())));
149 subdoc.append(basic::kvp(
"colorspace", colorspace_to_string(imginfo.img->
colorspace())));
151 std::stringstream
name;
152 name << imginfo.topic_name <<
"_" << cap_time.
in_msec();
153 auto uploader = gridfs_.open_upload_stream(
name.str());
154 uploader.write((uint8_t *)imginfo.img->buffer(), imginfo.img->data_size());
155 auto result = uploader.close();
156 subdoc.append(basic::kvp(
"data", [&](basic::sub_document subdoc) {
157 subdoc.append(basic::kvp(
"id", result.id()));
158 subdoc.append(basic::kvp(
"filename", name.str()));
163 mongodb_->database(database_)[imginfo.topic_name].insert_one(document.view());
165 }
catch (operation_exception &e) {
167 "Failed to insert image %s into %s.%s: %s",
168 imginfo.img->image_id(),
170 imginfo.topic_name.c_str(),
179 "Stored %u of %zu images in %.1f ms",
182 (loop_end - &loop_start) * 1000.);
187 MongoLogImagesThread::update_images()
189 std::set<std::string> missing_images;
190 std::set<std::string> unbacked_images;
191 get_sets(missing_images, unbacked_images);
193 if (!unbacked_images.empty()) {
194 std::set<std::string>::iterator i;
195 for (i = unbacked_images.begin(); i != unbacked_images.end(); ++i) {
197 "Shutting down MongoLog for no longer available image %s",
205 if (!missing_images.empty()) {
206 std::set<std::string>::iterator i;
207 for (i = missing_images.begin(); i != missing_images.end(); ++i) {
208 std::vector<std::string>::iterator f;
209 bool include = includes_.empty();
211 for (f = includes_.begin(); f != includes_.end(); ++f) {
212 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
219 for (f = excludes_.begin(); f != excludes_.end(); ++f) {
220 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
233 std::string topic_name = std::string(
"Images.") + *i;
235 while ((pos = topic_name.find_first_of(
" -", pos)) != std::string::npos) {
236 topic_name.replace(pos, 1,
"_");
241 imginfo.topic_name = topic_name;
249 MongoLogImagesThread::get_sets(std::set<std::string> &missing_images,
250 std::set<std::string> &unbacked_images)
252 std::set<std::string> published_images;
253 std::map<std::string, ImageInfo>::iterator p;
254 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
255 if (p->second.img->num_attached() > 1) {
256 published_images.insert(p->first);
260 std::set<std::string> image_buffers;
269 image_buffers.insert(ih->
image_id());
275 missing_images.clear();
276 unbacked_images.clear();
278 std::set_difference(image_buffers.begin(),
280 published_images.begin(),
281 published_images.end(),
282 std::inserter(missing_images, missing_images.end()));
284 std::set_difference(published_images.begin(),
285 published_images.end(),
286 image_buffers.begin(),
288 std::inserter(unbacked_images, unbacked_images.end()));