Fawkes API  Fawkes Development Version
logreplay_thread.cpp
1 
2 /***************************************************************************
3  * logreplay_thread.cpp - BB Log Replay Thread
4  *
5  * Created: Wed Feb 17 01:53:00 2010
6  * Copyright 2010 Tim Niemueller [www.niemueller.de]
7  * 2010 Masrur Doostdar <doostdar@kbsg.rwth-aachen.de>
8  *
9  ****************************************************************************/
10 
11 /* This program is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation; either version 2 of the License, or
14  * (at your option) any later version.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL file in the doc directory.
22  */
23 
24 #include "logreplay_thread.h"
25 
26 #include "file.h"
27 
28 #include <blackboard/blackboard.h>
29 #include <blackboard/internal/instance_factory.h>
30 #include <core/exceptions/system.h>
31 #include <core/threading/wait_condition.h>
32 #include <logging/logger.h>
33 #include <utils/misc/autofree.h>
34 
35 #include <cerrno>
36 #include <cstdio>
37 #include <cstdlib>
38 #include <cstring>
39 #include <fcntl.h>
40 #include <memory>
41 #ifdef __FreeBSD__
42 # include <sys/endian.h>
43 #elif defined(__MACH__) && defined(__APPLE__)
44 # include <sys/_endian.h>
45 #else
46 # include <endian.h>
47 #endif
48 #include <arpa/inet.h>
49 #include <sys/mman.h>
50 
51 using namespace fawkes;
52 
53 /** @class BBLogReplayThread "logreplay_thread.h"
54  * BlackBoard log Replay thread.
55  * Writes the data of the logfile into a blackboard interface, considering the
56  * time-step differences between the data.
57  * @author Masrur Doostdar
58  * @author Tim Niemueller
59  */
60 
61 /** Constructor.
62  * @param logfile_name filename of the log to be replayed
63  * @param logdir directory containing the logfile
64  * @param scenario ID of the log scenario
65  * @param grace_period time in seconds that desired offset and loop offset may
66  * diverge to still write the new data
67  * @param loop_replay specifies if the replay should be looped
68  * @param non_blocking do not block the main loop if not enough time has elapsed
69  * to replay new data but just wait for the next cycle. This is ignored in
70  * continuous thread mode as it could cause busy waiting.
71  * @param thread_name initial thread name
72  * @param th_opmode thread operation mode
73  */
74 BBLogReplayThread::BBLogReplayThread(const char * logfile_name,
75  const char * logdir,
76  const char * scenario,
77  float grace_period,
78  bool loop_replay,
79  bool non_blocking,
80  const char * thread_name,
81  fawkes::Thread::OpMode th_opmode)
82 : Thread(thread_name, th_opmode)
83 {
84  set_name("BBLogReplayThread(%s)", logfile_name);
86 
87  logfile_name_ = strdup(logfile_name);
88  logdir_ = strdup(logdir);
89  scenario_ = strdup(scenario); // dont need this!?
90  filename_ = NULL;
91  cfg_grace_period_ = grace_period;
92  cfg_loop_replay_ = loop_replay;
93  if (th_opmode == OPMODE_WAITFORWAKEUP) {
94  cfg_non_blocking_ = non_blocking;
95  } else {
96  // would cause busy waiting
97  cfg_non_blocking_ = false;
98  }
99 }
100 
101 /** Destructor. */
103 {
104  free(logfile_name_);
105  free(logdir_);
106  free(scenario_);
107 }
108 
109 void
111 {
112  logfile_ = NULL;
113  interface_ = NULL;
114  filename_ = NULL;
115 
116  if (asprintf(&filename_, "%s/%s", logdir_, logfile_name_) == -1) {
117  throw OutOfMemoryException("Cannot re-generate logfile-path");
118  }
119 
120  try {
121  logfile_ = new BBLogFile(filename_, true);
122  } catch (Exception &e) {
123  finalize();
124  throw;
125  }
126 
127  if (!logfile_->has_next()) {
128  finalize();
129  throw Exception("Log file %s does not have any entries", filename_);
130  }
131 
132  interface_ = blackboard->open_for_writing(logfile_->interface_type(), logfile_->interface_id());
133 
134  try {
135  logfile_->set_interface(interface_);
136  } catch (Exception &e) {
137  finalize();
138  throw;
139  }
140 
141  logger->log_info(name(), "Replaying from %s:", filename_);
142 }
143 
144 void
146 {
147  delete logfile_;
148  if (filename_)
149  free(filename_);
150  blackboard->close(interface_);
151 }
152 
153 void
155 {
156  // Write first immediately, skip first offset
157  logfile_->read_next();
158  interface_->write();
159  last_offset_ = logfile_->entry_offset();
160  if (logfile_->has_next()) {
161  logfile_->read_next();
162  offsetdiff_ = logfile_->entry_offset() - last_offset_;
163  last_offset_ = logfile_->entry_offset();
164  }
165  last_loop_.stamp();
166 }
167 
168 void
170 {
171  if (logfile_->has_next()) {
172  // check if there is time left to wait
173  now_.stamp();
174  loopdiff_ = now_ - last_loop_;
175  if ((offsetdiff_.in_sec() - loopdiff_.in_sec()) > cfg_grace_period_) {
176  if (cfg_non_blocking_) {
177  // need to keep waiting before posting, but in non-blocking mode
178  // just wait for next loop
179  return;
180  } else {
181  waittime_ = offsetdiff_ - loopdiff_;
182  waittime_.wait();
183  }
184  }
185 
186  interface_->write();
187  logfile_->read_next();
188 
189  last_loop_.stamp();
190  offsetdiff_ = logfile_->entry_offset() - last_offset_;
191  last_offset_ = logfile_->entry_offset();
192 
193  } else {
194  if (cfg_loop_replay_) {
195  logger->log_info(name(), "replay finished, looping");
196  logfile_->rewind();
197  } else {
198  if (opmode() == OPMODE_CONTINUOUS) {
199  // block
200  logger->log_info(name(), "replay finished, sleeping");
201  WaitCondition waitcond;
202  waitcond.wait();
203  } // else wait will just run once per loop
204  }
205  }
206 }
BBLogReplayThread::init
virtual void init()
Initialize the thread.
Definition: logreplay_thread.cpp:110
fawkes::Thread::set_prepfin_conc_loop
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:722
BBLogFile::set_interface
void set_interface(fawkes::Interface *interface)
Set the internal interface.
Definition: bblogfile.cpp:494
BBLogFile::read_next
void read_next()
Read next entry.
Definition: bblogfile.cpp:283
fawkes::WaitCondition
Definition: wait_condition.h:42
BBLogReplayThread::~BBLogReplayThread
virtual ~BBLogReplayThread()
Destructor.
Definition: logreplay_thread.cpp:102
fawkes::Logger::log_info
virtual void log_info(const char *component, const char *format,...)=0
fawkes::Thread::OPMODE_CONTINUOUS
@ OPMODE_CONTINUOUS
operate in continuous mode (default)
Definition: thread.h:57
BBLogFile::interface_id
const char * interface_id() const
Get interface ID.
Definition: bblogfile.cpp:568
BBLogFile::rewind
void rewind()
Rewind file to start.
Definition: bblogfile.cpp:254
fawkes::Thread::name
const char * name() const
Definition: thread.h:100
fawkes::Time::wait
void wait()
Wait (sleep) for this time.
Definition: time.cpp:743
fawkes::WaitCondition::wait
void wait()
Wait for the condition forever.
Definition: wait_condition.cpp:145
BBLogFile::has_next
bool has_next()
Check if another entry is available.
Definition: bblogfile.cpp:266
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:53
fawkes::BlackBoard::close
virtual void close(Interface *interface)=0
BBLogReplayThread::BBLogReplayThread
BBLogReplayThread(const char *logfile_name, const char *logdir, const char *scenario, float grace_period, bool loop_replay, bool non_blocking=false, const char *thread_name="BBLogReplayThread", fawkes::Thread::OpMode th_opmode=Thread::OPMODE_CONTINUOUS)
Constructor.
Definition: logreplay_thread.cpp:74
fawkes
fawkes::Thread::OPMODE_WAITFORWAKEUP
@ OPMODE_WAITFORWAKEUP
operate in wait-for-wakeup mode
Definition: thread.h:58
BBLogReplayThread::once
virtual void once()
Execute an action exactly once.
Definition: logreplay_thread.cpp:154
fawkes::Thread
Definition: thread.h:45
fawkes::Thread::opmode
OpMode opmode() const
Get operation mode.
Definition: thread.cpp:677
fawkes::BlackBoardAspect::blackboard
BlackBoard * blackboard
Definition: blackboard.h:49
fawkes::Time::stamp
Time & stamp()
Set this time to the current time.
Definition: time.cpp:711
fawkes::OutOfMemoryException
Definition: system.h:37
fawkes::Thread::OpMode
OpMode
Thread operation mode.
Definition: thread.h:56
BBLogFile::interface_type
const char * interface_type() const
Get interface type.
Definition: bblogfile.cpp:559
fawkes::Interface::write
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:499
fawkes::BlackBoard::open_for_writing
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
fawkes::Time::in_sec
double in_sec() const
Convet time to seconds.
Definition: time.cpp:226
BBLogFile::entry_offset
const fawkes::Time & entry_offset() const
Get current entry offset.
Definition: bblogfile.cpp:514
fawkes::Thread::set_name
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:754
BBLogReplayThread::loop
virtual void loop()
Code to execute in the thread.
Definition: logreplay_thread.cpp:169
BBLogReplayThread::finalize
virtual void finalize()
Finalize the thread.
Definition: logreplay_thread.cpp:145
BBLogFile
Definition: bblogfile.h:39
fawkes::Exception
Definition: exception.h:41