Fawkes API  Fawkes Development Version
mongodb_log_bb_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_bb_thread.cpp - MongoDB blackboard logging Thread
4  *
5  * Created: Wed Dec 08 23:09:29 2010
6  * Copyright 2010-2017 Tim Niemueller [www.niemueller.de]
7  * 2012 Bastian Klingen
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "mongodb_log_bb_thread.h"
24 
25 #include <core/threading/mutex_locker.h>
26 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
27 
28 #include <cstdlib>
29 #include <fnmatch.h>
30 #include <mongocxx/client.hpp>
31 #include <mongocxx/exception/operation_exception.hpp>
32 
33 using namespace mongocxx;
34 using namespace fawkes;
35 
36 /** @class MongoLogBlackboardThread "mongodb_thread.h"
37  * MongoDB Logging Thread.
38  * This thread registers to interfaces specified with patterns in the
39  * configurationa and logs any changes to MongoDB.
40  *
41  * @author Tim Niemueller
42  */
43 
44 /** Constructor. */
46 : Thread("MongoLogBlackboardThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
47 {
48 }
49 
50 /** Destructor. */
52 {
53 }
54 
55 void
57 {
58  now_ = new Time(clock);
59  database_ = "fflog";
60  try {
61  database_ = config->get_string("/plugins/mongodb-log/database");
62  } catch (Exception &e) {
63  logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
64  }
65 
66  std::vector<std::string> includes;
67  try {
68  includes = config->get_strings("/plugins/mongodb-log/blackboard/includes");
69  } catch (Exception &e) {
70  } // ignored, no include rules
71  try {
72  excludes_ = config->get_strings("/plugins/mongodb-log/blackboard/excludes");
73  } catch (Exception &e) {
74  } // ignored, no include rules
75 
76  if (includes.empty()) {
77  includes.push_back("*");
78  }
79 
80  std::vector<std::string>::iterator i;
81  std::vector<std::string>::iterator e;
82  for (i = includes.begin(); i != includes.end(); ++i) {
83  bbio_add_observed_create("*", i->c_str());
84 
85  std::list<Interface *> current_interfaces =
86  blackboard->open_multiple_for_reading("*", i->c_str());
87 
88  std::list<Interface *>::iterator i;
89  for (i = current_interfaces.begin(); i != current_interfaces.end(); ++i) {
90  bool exclude = false;
91  for (e = excludes_.begin(); e != excludes_.end(); ++e) {
92  if (fnmatch(e->c_str(), (*i)->id(), 0) != FNM_NOMATCH) {
93  logger->log_debug(name(), "Excluding '%s' by config rule", (*i)->uid());
94  blackboard->close(*i);
95  exclude = true;
96  break;
97  }
98  }
99  if (exclude)
100  continue;
101 
102  logger->log_debug(name(), "Adding %s", (*i)->uid());
103  client *mc = mongodb_connmgr->create_client();
104  listeners_[(*i)->uid()] =
105  new InterfaceListener(blackboard, *i, mc, database_, collections_, logger, now_);
106  }
107  }
108 
110 }
111 
112 void
114 {
116 
117  std::map<std::string, InterfaceListener *>::iterator i;
118  for (i = listeners_.begin(); i != listeners_.end(); ++i) {
119  client *mc = i->second->mongodb_client();
120  delete i->second;
122  }
123  listeners_.clear();
124 }
125 
126 void
128 {
129 }
130 
131 // for BlackBoardInterfaceObserver
132 void
133 MongoLogBlackboardThread::bb_interface_created(const char *type, const char *id) throw()
134 {
135  MutexLocker lock(listeners_.mutex());
136 
137  std::vector<std::string>::iterator e;
138  for (e = excludes_.begin(); e != excludes_.end(); ++e) {
139  if (fnmatch(e->c_str(), id, 0) != FNM_NOMATCH) {
140  logger->log_debug(name(), "Ignoring excluded interface '%s::%s'", type, id);
141  return;
142  }
143  }
144 
145  try {
146  Interface *interface = blackboard->open_for_reading(type, id);
147  if (listeners_.find(interface->uid()) == listeners_.end()) {
148  logger->log_debug(name(), "Opening new %s", interface->uid());
149  client *mc = mongodb_connmgr->create_client();
150  listeners_[interface->uid()] =
151  new InterfaceListener(blackboard, interface, mc, database_, collections_, logger, now_);
152  } else {
153  logger->log_warn(name(), "Interface %s already opened", interface->uid());
154  blackboard->close(interface);
155  }
156  } catch (Exception &e) {
157  logger->log_warn(name(), "Failed to open interface %s::%s, exception follows", type, id);
158  logger->log_warn(name(), e);
159  }
160 }
161 
162 /** Constructor.
163  * @param blackboard blackboard
164  * @param interface interface to listen for
165  * @param mongodb MongoDB client to write to
166  * @param database name of database to write to
167  * @param colls collections
168  * @param logger logger
169  * @param now Time
170  */
171 MongoLogBlackboardThread::InterfaceListener::InterfaceListener(BlackBoard * blackboard,
172  Interface * interface,
173  client * mongodb,
174  std::string & database,
175  LockSet<std::string> &colls,
176  Logger * logger,
177  Time * now)
178 : BlackBoardInterfaceListener("MongoLogListener-%s", interface->uid()),
179  database_(database),
180  collections_(colls)
181 {
182  blackboard_ = blackboard;
183  interface_ = interface;
184  mongodb_ = mongodb;
185  logger_ = logger;
186  now_ = now;
187 
188  // sanitize interface ID to be suitable for MongoDB
189  std::string id = interface->id();
190  size_t pos = 0;
191  while ((pos = id.find_first_of(" -", pos)) != std::string::npos) {
192  id.replace(pos, 1, "_");
193  pos = pos + 1;
194  }
195  collection_ = std::string(interface->type()) + "." + id;
196  if (collections_.find(collection_) != collections_.end()) {
197  throw Exception("Collection named %s already used, cannot log %s",
198  collection_.c_str(),
199  interface->uid());
200  }
201 
202  bbil_add_data_interface(interface);
203  blackboard_->register_listener(this, BlackBoard::BBIL_FLAG_DATA);
204 }
205 
206 /** Destructor. */
207 MongoLogBlackboardThread::InterfaceListener::~InterfaceListener()
208 {
209  blackboard_->unregister_listener(this);
210 }
211 
212 void
213 MongoLogBlackboardThread::InterfaceListener::bb_interface_data_changed(Interface *interface) throw()
214 {
215  now_->stamp();
216  interface->read();
217 
218  try {
219  // write interface data
220  using namespace bsoncxx::builder;
221  basic::document document;
222  document.append(basic::kvp("timestamp", static_cast<int64_t>(now_->in_msec())));
224  for (i = interface->fields(); i != interface->fields_end(); ++i) {
225  size_t length = i.get_length();
226  bool is_array = (length > 1);
227 
228  std::string key{i.get_name()};
229  switch (i.get_type()) {
230  case IFT_BOOL:
231  if (is_array) {
232  bool *bools = i.get_bools();
233  document.append(basic::kvp(key, [bools, length](basic::sub_array subarray) {
234  for (size_t l = 0; l < length; ++l) {
235  subarray.append(bools[l]);
236  }
237  }));
238  } else {
239  document.append(basic::kvp(key, i.get_bool()));
240  }
241  break;
242 
243  case IFT_INT8:
244  if (is_array) {
245  int8_t *ints = i.get_int8s();
246  document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
247  for (size_t l = 0; l < length; ++l) {
248  subarray.append(ints[l]);
249  }
250  }));
251  } else {
252  document.append(basic::kvp(key, i.get_int8()));
253  }
254  break;
255 
256  case IFT_UINT8:
257  if (is_array) {
258  uint8_t *ints = i.get_uint8s();
259  document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
260  for (size_t l = 0; l < length; ++l) {
261  subarray.append(ints[l]);
262  }
263  }));
264  } else {
265  document.append(basic::kvp(key, i.get_uint8()));
266  }
267  break;
268 
269  case IFT_INT16:
270  if (is_array) {
271  int16_t *ints = i.get_int16s();
272  document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
273  for (size_t l = 0; l < length; ++l) {
274  subarray.append(ints[l]);
275  }
276  }));
277  } else {
278  document.append(basic::kvp(key, i.get_int16()));
279  }
280  break;
281 
282  case IFT_UINT16:
283  if (is_array) {
284  uint16_t *ints = i.get_uint16s();
285  document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
286  for (size_t l = 0; l < length; ++l) {
287  subarray.append(ints[l]);
288  }
289  }));
290  } else {
291  document.append(basic::kvp(key, i.get_uint16()));
292  }
293  break;
294 
295  case IFT_INT32:
296  if (is_array) {
297  int32_t *ints = i.get_int32s();
298  document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
299  for (size_t l = 0; l < length; ++l) {
300  subarray.append(ints[l]);
301  }
302  }));
303  } else {
304  document.append(basic::kvp(key, i.get_int32()));
305  }
306  break;
307 
308  case IFT_UINT32:
309  if (is_array) {
310  uint32_t *ints = i.get_uint32s();
311  document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
312  for (size_t l = 0; l < length; ++l) {
313  subarray.append(static_cast<int64_t>(ints[l]));
314  }
315  }));
316  } else {
317  document.append(basic::kvp(key, static_cast<int64_t>(i.get_uint32())));
318  }
319  break;
320 
321  case IFT_INT64:
322  if (is_array) {
323  int64_t *ints = i.get_int64s();
324  document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
325  for (size_t l = 0; l < length; ++l) {
326  subarray.append(ints[l]);
327  }
328  }));
329  } else {
330  document.append(basic::kvp(key, i.get_int64()));
331  }
332  break;
333 
334  case IFT_UINT64:
335  if (is_array) {
336  uint64_t *ints = i.get_uint64s();
337  document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
338  for (size_t l = 0; l < length; ++l) {
339  subarray.append(static_cast<int64_t>(ints[l]));
340  }
341  }));
342  } else {
343  document.append(basic::kvp(key, static_cast<int64_t>(i.get_uint64())));
344  }
345  break;
346 
347  case IFT_FLOAT:
348  if (is_array) {
349  float *floats = i.get_floats();
350  document.append(basic::kvp(key, [floats, length](basic::sub_array subarray) {
351  for (size_t l = 0; l < length; ++l) {
352  subarray.append(floats[l]);
353  }
354  }));
355  } else {
356  document.append(basic::kvp(key, i.get_float()));
357  }
358  break;
359 
360  case IFT_DOUBLE:
361  if (is_array) {
362  double *doubles = i.get_doubles();
363  document.append(basic::kvp(key, [doubles, length](basic::sub_array subarray) {
364  for (size_t l = 0; l < length; ++l) {
365  subarray.append(doubles[l]);
366  }
367  }));
368  } else {
369  document.append(basic::kvp(key, i.get_double()));
370  }
371  break;
372 
373  case IFT_STRING: document.append(basic::kvp(key, i.get_string())); break;
374 
375  case IFT_BYTE:
376  if (is_array) {
377  uint8_t *bytes = i.get_bytes();
378  document.append(basic::kvp(key, [bytes, length](basic::sub_array subarray) {
379  for (size_t l = 0; l < length; ++l) {
380  subarray.append(bytes[l]);
381  }
382  }));
383  } else {
384  document.append(basic::kvp(key, i.get_byte()));
385  }
386  break;
387 
388  case IFT_ENUM:
389  if (is_array) {
390  int32_t *ints = i.get_enums();
391  document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
392  for (size_t l = 0; l < length; ++l) {
393  subarray.append(ints[l]);
394  }
395  }));
396  } else {
397  document.append(basic::kvp(key, i.get_enum()));
398  }
399  break;
400  }
401  }
402 
403  mongodb_->database(database_)[collection_].insert_one(document.view());
404  } catch (operation_exception &e) {
405  logger_->log_warn(
406  bbil_name(), "Failed to log to %s.%s: %s", database_.c_str(), collection_.c_str(), e.what());
407  } catch (std::exception &e) {
408  logger_->log_warn(bbil_name(),
409  "Failed to log to %s.%s: %s (*)",
410  database_.c_str(),
411  collection_.c_str(),
412  e.what());
413  }
414 }
fawkes::InterfaceFieldIterator::get_enums
int32_t * get_enums() const
Get value of current enum field as integer array.
Definition: field_iterator.cpp:933
fawkes::MongoDBAspect
Definition: mongodb.h:43
MongoLogBlackboardThread::MongoLogBlackboardThread
MongoLogBlackboardThread()
Constructor.
Definition: mongodb_log_bb_thread.cpp:45
fawkes::IFT_UINT8
@ IFT_UINT8
8 bit unsigned integer field
Definition: types.h:53
fawkes::InterfaceFieldIterator::get_bytes
uint8_t * get_bytes() const
Get value of current field as byte array.
Definition: field_iterator.cpp:915
fawkes::Interface::fields_end
InterfaceFieldIterator fields_end()
Invalid iterator.
Definition: interface.cpp:1209
fawkes::MongoDBConnCreator::create_client
virtual mongocxx::client * create_client(const std::string &config_name="")=0
Create a new MongoDB client.
fawkes::InterfaceFieldIterator::get_floats
float * get_floats() const
Get value of current field as float array.
Definition: field_iterator.cpp:879
fawkes::IFT_UINT64
@ IFT_UINT64
64 bit unsigned integer field
Definition: types.h:59
fawkes::Interface::read
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:477
fawkes::IFT_INT8
@ IFT_INT8
8 bit integer field
Definition: types.h:52
fawkes::IFT_BOOL
@ IFT_BOOL
boolean field
Definition: types.h:51
fawkes::LockSet< std::string >
fawkes::MongoDBAspect::mongodb_connmgr
MongoDBConnCreator * mongodb_connmgr
Definition: mongodb.h:60
fawkes::InterfaceFieldIterator::get_uint16s
uint16_t * get_uint16s() const
Get value of current field as unsigned integer array.
Definition: field_iterator.cpp:789
fawkes::IFT_FLOAT
@ IFT_FLOAT
float field
Definition: types.h:60
fawkes::IFT_ENUM
@ IFT_ENUM
field with interface specific enum type
Definition: types.h:64
fawkes::IFT_UINT16
@ IFT_UINT16
16 bit unsigned integer field
Definition: types.h:55
fawkes::Logger::log_info
virtual void log_info(const char *component, const char *format,...)=0
fawkes::IFT_UINT32
@ IFT_UINT32
32 bit unsigned integer field
Definition: types.h:57
fawkes::MutexLocker
Definition: mutex_locker.h:39
fawkes::BlackBoardInterfaceListener
Definition: interface_listener.h:47
MongoLogBlackboardThread::init
virtual void init()
Initialize the thread.
Definition: mongodb_log_bb_thread.cpp:56
MongoLogBlackboardThread::finalize
virtual void finalize()
Finalize the thread.
Definition: mongodb_log_bb_thread.cpp:113
fawkes::BlackBoard
Definition: blackboard.h:50
fawkes::InterfaceFieldIterator::get_byte
uint8_t get_byte(unsigned int index=0) const
Get value of current field as byte.
Definition: field_iterator.cpp:646
fawkes::Interface::type
const char * type() const
Get type of interface.
Definition: interface.cpp:645
fawkes::InterfaceFieldIterator::get_int64s
int64_t * get_int64s() const
Get value of current field as integer array.
Definition: field_iterator.cpp:843
fawkes::Thread::name
const char * name() const
Definition: thread.h:100
MongoLogBlackboardThread::loop
virtual void loop()
Code to execute in the thread.
Definition: mongodb_log_bb_thread.cpp:127
fawkes::Interface::id
const char * id() const
Get identifier of interface.
Definition: interface.cpp:654
fawkes::ClockAspect::clock
Clock * clock
Definition: clock.h:56
fawkes::BlackBoardInterfaceObserver::bbio_add_observed_create
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*")
Add interface creation type to watch list.
Definition: interface_observer.cpp:125
fawkes::InterfaceFieldIterator::get_bool
bool get_bool(unsigned int index=0) const
Get value of current field as bool.
Definition: field_iterator.cpp:415
fawkes::MultiLogger::log_debug
virtual void log_debug(const char *component, const char *format,...)
Definition: multi.cpp:180
fawkes::MultiLogger::log_warn
virtual void log_warn(const char *component, const char *format,...)
Definition: multi.cpp:222
fawkes::InterfaceFieldIterator
Definition: field_iterator.h:38
fawkes::InterfaceFieldIterator::get_uint8s
uint8_t * get_uint8s() const
Get value of current field as unsigned integer array.
Definition: field_iterator.cpp:753
fawkes::InterfaceFieldIterator::get_type
interface_fieldtype_t get_type() const
Get type of current field.
Definition: field_iterator.cpp:198
MongoLogBlackboardThread::~MongoLogBlackboardThread
virtual ~MongoLogBlackboardThread()
Destructor.
Definition: mongodb_log_bb_thread.cpp:51
fawkes::IFT_INT32
@ IFT_INT32
32 bit integer field
Definition: types.h:56
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:53
fawkes::BlackBoard::close
virtual void close(Interface *interface)=0
fawkes::Logger
Definition: logger.h:41
fawkes::InterfaceFieldIterator::get_float
float get_float(unsigned int index=0) const
Get value of current field as float.
Definition: field_iterator.cpp:604
fawkes::InterfaceFieldIterator::get_uint8
uint8_t get_uint8(unsigned int index=0) const
Get value of current field as unsigned integer.
Definition: field_iterator.cpp:457
fawkes
fawkes::InterfaceFieldIterator::get_uint32
uint32_t get_uint32(unsigned int index=0) const
Get value of current field as unsigned integer.
Definition: field_iterator.cpp:541
fawkes::InterfaceFieldIterator::get_uint64
uint64_t get_uint64(unsigned int index=0) const
Get value of current field as unsigned integer.
Definition: field_iterator.cpp:583
fawkes::InterfaceFieldIterator::get_doubles
double * get_doubles() const
Get value of current field as double array.
Definition: field_iterator.cpp:897
fawkes::InterfaceFieldIterator::get_int32s
int32_t * get_int32s() const
Get value of current field as integer array.
Definition: field_iterator.cpp:807
fawkes::InterfaceFieldIterator::get_int8
int8_t get_int8(unsigned int index=0) const
Get value of current field as integer.
Definition: field_iterator.cpp:436
fawkes::Configuration::get_strings
virtual std::vector< std::string > get_strings(const char *path)=0
fawkes::InterfaceFieldIterator::get_name
const char * get_name() const
Get name of current field.
Definition: field_iterator.cpp:267
fawkes::BlackBoard::register_observer
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:230
fawkes::Interface
Definition: interface.h:78
fawkes::InterfaceFieldIterator::get_int16
int16_t get_int16(unsigned int index=0) const
Get value of current field as integer.
Definition: field_iterator.cpp:478
fawkes::InterfaceFieldIterator::get_int8s
int8_t * get_int8s() const
Get value of current field as integer array.
Definition: field_iterator.cpp:735
fawkes::ConfigurableAspect::config
Configuration * config
Definition: configurable.h:53
fawkes::InterfaceFieldIterator::get_bools
bool * get_bools() const
Get value of current field as bool array.
Definition: field_iterator.cpp:715
fawkes::InterfaceFieldIterator::get_string
const char * get_string() const
Get value of current field as string.
Definition: field_iterator.cpp:950
fawkes::Interface::uid
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:679
fawkes::InterfaceFieldIterator::get_int16s
int16_t * get_int16s() const
Get value of current field as integer array.
Definition: field_iterator.cpp:771
fawkes::InterfaceFieldIterator::get_uint64s
uint64_t * get_uint64s() const
Get value of current field as unsigned integer array.
Definition: field_iterator.cpp:861
fawkes::IFT_INT64
@ IFT_INT64
64 bit integer field
Definition: types.h:58
fawkes::InterfaceFieldIterator::get_length
size_t get_length() const
Get length of current field.
Definition: field_iterator.cpp:293
fawkes::Interface::fields
InterfaceFieldIterator fields()
Get iterator over all fields of this interface instance.
Definition: interface.cpp:1200
fawkes::Time
Definition: time.h:98
MongoLogBlackboardThread::bb_interface_created
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
Definition: mongodb_log_bb_thread.cpp:133
fawkes::IFT_DOUBLE
@ IFT_DOUBLE
double field
Definition: types.h:61
fawkes::InterfaceFieldIterator::get_uint32s
uint32_t * get_uint32s() const
Get value of current field as unsigned integer array.
Definition: field_iterator.cpp:825
fawkes::Thread
Definition: thread.h:45
fawkes::BlackBoard::unregister_observer
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: blackboard.cpp:245
fawkes::BlackBoardAspect::blackboard
BlackBoard * blackboard
Definition: blackboard.h:49
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
fawkes::IFT_STRING
@ IFT_STRING
string field
Definition: types.h:62
fawkes::BlackBoard::open_multiple_for_reading
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
fawkes::MongoDBConnCreator::delete_client
virtual void delete_client(mongocxx::client *client)=0
Delete a client.
fawkes::IFT_BYTE
@ IFT_BYTE
byte field, alias for uint8
Definition: types.h:63
fawkes::IFT_INT16
@ IFT_INT16
16 bit integer field
Definition: types.h:54
fawkes::InterfaceFieldIterator::get_int32
int32_t get_int32(unsigned int index=0) const
Get value of current field as integer.
Definition: field_iterator.cpp:520
fawkes::InterfaceFieldIterator::get_enum
int32_t get_enum(unsigned int index=0) const
Get value of current enum field as integer.
Definition: field_iterator.cpp:667
fawkes::InterfaceFieldIterator::get_double
double get_double(unsigned int index=0) const
Get value of current field as double.
Definition: field_iterator.cpp:625
fawkes::Logger::log_debug
virtual void log_debug(const char *component, const char *format,...)=0
fawkes::InterfaceFieldIterator::get_int64
int64_t get_int64(unsigned int index=0) const
Get value of current field as integer.
Definition: field_iterator.cpp:562
fawkes::InterfaceFieldIterator::get_uint16
uint16_t get_uint16(unsigned int index=0) const
Get value of current field as unsigned integer.
Definition: field_iterator.cpp:499
fawkes::Exception
Definition: exception.h:41