Fawkes API  Fawkes Development Version
mongodb_log_image_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_image_thread.cpp - Thread to log images to MongoDB
4  *
5  * Created: Tue Apr 10 22:12:38 2012
6  * Copyright 2011-2017 Tim Niemueller [www.niemueller.de]
7  * 2012 Bastian Klingen
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "mongodb_log_image_thread.h"
24 
25 #include <core/threading/mutex_locker.h>
26 #include <fvutils/color/colorspaces.h>
27 #include <fvutils/ipc/shm_image.h>
28 #include <utils/time/wait.h>
29 
30 // from MongoDB
31 #include <bsoncxx/builder/basic/document.hpp>
32 #include <fnmatch.h>
33 #include <mongocxx/client.hpp>
34 #include <mongocxx/exception/operation_exception.hpp>
35 #include <mongocxx/gridfs/uploader.hpp>
36 
37 using namespace fawkes;
38 using namespace firevision;
39 using namespace mongocxx;
40 
41 /** @class MongoLogImagesThread "mongodb_log_image_thread.h"
42  * Thread to export Fawkes images to MongoDB.
43  * @author Tim Niemueller
44  * @author Bastian Klingen
45  */
46 
47 /** Constructor. */
49 : Thread("MongoLogImagesThread", Thread::OPMODE_CONTINUOUS), MongoDBAspect("default")
50 {
52 }
53 
54 /** Destructor. */
56 {
57 }
58 
59 void
61 {
62  database_ = "fflog";
63  try {
64  database_ = config->get_string("/plugins/mongodb-log/database");
65  } catch (Exception &e) {
66  logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
67  }
68 
69  cfg_storage_interval_ = config->get_float("/plugins/mongodb-log/images/storage-interval");
70 
71  cfg_chunk_size_ = 2097152; // 2 MB
72  try {
73  cfg_chunk_size_ = config->get_uint("/plugins/mongodb-log/images/chunk-size");
74  } catch (Exception &e) {
75  } // ignored, use default
76  logger->log_info(name(), "Chunk size: %u", cfg_chunk_size_);
77 
78  try {
79  includes_ = config->get_strings("/plugins/mongodb-log/images/includes");
80  } catch (Exception &e) {
81  } // ignored, no include rules
82  try {
83  excludes_ = config->get_strings("/plugins/mongodb-log/images/excludes");
84  } catch (Exception &e) {
85  } // ignored, no include rules
86 
87  mongodb_ = mongodb_client;
88  gridfs_ = mongodb_->database(database_).gridfs_bucket();
89 
90  last_update_ = new Time(clock);
91  now_ = new Time(clock);
92  wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
93  mutex_ = new Mutex();
94  update_images();
95 }
96 
97 bool
99 {
100  mutex_->lock();
101  return true;
102 }
103 
104 void
106 {
107  logger->log_debug(name(), "Finalizing MongoLogImagesThread");
108  std::map<std::string, ImageInfo>::iterator p;
109  for (p = imgs_.begin(); p != imgs_.end(); ++p) {
110  delete p->second.img;
111  }
112  imgs_.clear();
113  delete wait_;
114  delete mutex_;
115  delete now_;
116  delete last_update_;
117 }
118 
119 void
121 {
122  MutexLocker lock(mutex_);
123  fawkes::Time loop_start(clock);
124  wait_->mark_start();
125  unsigned int num_stored = 0;
126 
127  now_->stamp();
128  if (*now_ - last_update_ >= 5.0) {
129  *last_update_ = now_;
130  update_images();
131  }
132 
133  std::map<std::string, ImageInfo>::iterator p;
134  for (p = imgs_.begin(); p != imgs_.end(); ++p) {
135  ImageInfo &imginfo = p->second;
136 
137  fawkes::Time cap_time = imginfo.img->capture_time();
138 
139  if ((imginfo.last_sent != cap_time)) {
140  using namespace bsoncxx::builder;
141  basic::document document;
142  imginfo.last_sent = cap_time;
143  document.append(basic::kvp("timestamp", static_cast<int64_t>(cap_time.in_msec())));
144 
145  document.append(basic::kvp("image", [&](basic::sub_document subdoc) {
146  subdoc.append(basic::kvp("image_id", imginfo.img->image_id()));
147  subdoc.append(basic::kvp("width", static_cast<int32_t>(imginfo.img->width())));
148  subdoc.append(basic::kvp("height", static_cast<int32_t>(imginfo.img->height())));
149  subdoc.append(basic::kvp("colorspace", colorspace_to_string(imginfo.img->colorspace())));
150 
151  std::stringstream name;
152  name << imginfo.topic_name << "_" << cap_time.in_msec();
153  auto uploader = gridfs_.open_upload_stream(name.str());
154  uploader.write((uint8_t *)imginfo.img->buffer(), imginfo.img->data_size());
155  auto result = uploader.close();
156  subdoc.append(basic::kvp("data", [&](basic::sub_document subdoc) {
157  subdoc.append(basic::kvp("id", result.id()));
158  subdoc.append(basic::kvp("filename", name.str()));
159  }));
160  }));
161 
162  try {
163  mongodb_->database(database_)[imginfo.topic_name].insert_one(document.view());
164  ++num_stored;
165  } catch (operation_exception &e) {
166  logger->log_warn(this->name(),
167  "Failed to insert image %s into %s.%s: %s",
168  imginfo.img->image_id(),
169  database_.c_str(),
170  imginfo.topic_name.c_str(),
171  e.what());
172  }
173  }
174  }
175 
176  mutex_->unlock();
177  fawkes::Time loop_end(clock);
178  logger->log_debug(name(),
179  "Stored %u of %zu images in %.1f ms",
180  num_stored,
181  imgs_.size(),
182  (loop_end - &loop_start) * 1000.);
183  wait_->wait();
184 }
185 
186 void
187 MongoLogImagesThread::update_images()
188 {
189  std::set<std::string> missing_images;
190  std::set<std::string> unbacked_images;
191  get_sets(missing_images, unbacked_images);
192 
193  if (!unbacked_images.empty()) {
194  std::set<std::string>::iterator i;
195  for (i = unbacked_images.begin(); i != unbacked_images.end(); ++i) {
196  logger->log_info(name(),
197  "Shutting down MongoLog for no longer available image %s",
198  i->c_str());
199  ImageInfo &imginfo = imgs_[*i];
200  delete imginfo.img;
201  imgs_.erase(*i);
202  }
203  }
204 
205  if (!missing_images.empty()) {
206  std::set<std::string>::iterator i;
207  for (i = missing_images.begin(); i != missing_images.end(); ++i) {
208  std::vector<std::string>::iterator f;
209  bool include = includes_.empty();
210  if (!include) {
211  for (f = includes_.begin(); f != includes_.end(); ++f) {
212  if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
213  include = true;
214  break;
215  }
216  }
217  }
218  if (include) {
219  for (f = excludes_.begin(); f != excludes_.end(); ++f) {
220  if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
221  include = false;
222  break;
223  }
224  }
225  }
226  if (!include) {
227  //logger->log_info(name(), "Excluding image %s", i->c_str());
228  continue;
229  }
230 
231  logger->log_info(name(), "Starting to log image %s", i->c_str());
232 
233  std::string topic_name = std::string("Images.") + *i;
234  size_t pos = 0;
235  while ((pos = topic_name.find_first_of(" -", pos)) != std::string::npos) {
236  topic_name.replace(pos, 1, "_");
237  pos = pos + 1;
238  }
239 
240  ImageInfo imginfo;
241  imginfo.topic_name = topic_name;
242  imginfo.img = new SharedMemoryImageBuffer(i->c_str());
243  imgs_[*i] = imginfo;
244  }
245  }
246 }
247 
248 void
249 MongoLogImagesThread::get_sets(std::set<std::string> &missing_images,
250  std::set<std::string> &unbacked_images)
251 {
252  std::set<std::string> published_images;
253  std::map<std::string, ImageInfo>::iterator p;
254  for (p = imgs_.begin(); p != imgs_.end(); ++p) {
255  if (p->second.img->num_attached() > 1) {
256  published_images.insert(p->first);
257  }
258  }
259 
260  std::set<std::string> image_buffers;
262  SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
263  SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
264 
265  while (i != endi) {
267  dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
268  if (ih) {
269  image_buffers.insert(ih->image_id());
270  }
271  ++i;
272  }
273  delete h;
274 
275  missing_images.clear();
276  unbacked_images.clear();
277 
278  std::set_difference(image_buffers.begin(),
279  image_buffers.end(),
280  published_images.begin(),
281  published_images.end(),
282  std::inserter(missing_images, missing_images.end()));
283 
284  std::set_difference(published_images.begin(),
285  published_images.end(),
286  image_buffers.begin(),
287  image_buffers.end(),
288  std::inserter(unbacked_images, unbacked_images.end()));
289 }
fawkes::Mutex::lock
void lock()
Lock this mutex.
Definition: mutex.cpp:93
fawkes::Time::in_msec
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:235
fawkes::MongoDBAspect
Definition: mongodb.h:43
fawkes::Thread::set_prepfin_conc_loop
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:722
ImageInfo::width
std::optional< int64_t > width() const
Get width value.
Definition: ImageInfo.h:177
fawkes::Mutex
Definition: mutex.h:38
firevision::SharedMemoryImageBufferHeader::image_id
const char * image_id() const
Get image number.
Definition: shm_image.cpp:838
MongoLogImagesThread::loop
virtual void loop()
Code to execute in the thread.
Definition: mongodb_log_image_thread.cpp:120
fawkes::MongoDBAspect::mongodb_client
mongocxx::client * mongodb_client
Definition: mongodb.h:59
fawkes::Logger::log_info
virtual void log_info(const char *component, const char *format,...)=0
fawkes::MutexLocker
Definition: mutex_locker.h:39
firevision::SharedMemoryImageBuffer
Definition: shm_image.h:183
fawkes::Thread::name
const char * name() const
Definition: thread.h:100
fawkes::ClockAspect::clock
Clock * clock
Definition: clock.h:56
fawkes::Mutex::unlock
void unlock()
Unlock the mutex.
Definition: mutex.cpp:137
MongoLogImagesThread::finalize
virtual void finalize()
Finalize the thread.
Definition: mongodb_log_image_thread.cpp:105
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:53
fawkes::TimeWait::wait
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:84
fawkes
MongoLogImagesThread::~MongoLogImagesThread
virtual ~MongoLogImagesThread()
Destructor.
Definition: mongodb_log_image_thread.cpp:55
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
fawkes::Configuration::get_strings
virtual std::vector< std::string > get_strings(const char *path)=0
fawkes::SharedMemory::SharedMemoryIterator
Definition: shm.h:124
fawkes::TimeWait::mark_start
void mark_start()
Mark start of loop.
Definition: wait.cpp:74
fawkes::ConfigurableAspect::config
Configuration * config
Definition: configurable.h:53
MongoLogImagesThread::prepare_finalize_user
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
Definition: mongodb_log_image_thread.cpp:98
firevision::SharedMemoryImageBufferHeader
Definition: shm_image.h:66
fawkes::Time
Definition: time.h:98
MongoLogImagesThread::MongoLogImagesThread
MongoLogImagesThread()
Constructor.
Definition: mongodb_log_image_thread.cpp:48
fawkes::Configuration::get_float
virtual float get_float(const char *path)=0
fawkes::Thread
Definition: thread.h:45
fawkes::TimeWait
Definition: wait.h:38
ImageInfo::height
std::optional< int64_t > height() const
Get height value.
Definition: ImageInfo.h:194
fawkes::Configuration::get_uint
virtual unsigned int get_uint(const char *path)=0
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
fawkes::Time::stamp
Time & stamp()
Set this time to the current time.
Definition: time.cpp:711
fawkes::Logger::log_debug
virtual void log_debug(const char *component, const char *format,...)=0
ImageInfo::colorspace
std::optional< std::string > colorspace() const
Get colorspace value.
Definition: ImageInfo.h:143
ImageInfo
ImageInfo representation for JSON transfer.
Definition: ImageInfo.h:26
MongoLogImagesThread::init
virtual void init()
Initialize the thread.
Definition: mongodb_log_image_thread.cpp:60
fawkes::Exception
Definition: exception.h:41