Fawkes API Fawkes Development Version
logreplay_thread.cpp
1
2/***************************************************************************
3 * logreplay_thread.cpp - BB Log Replay Thread
4 *
5 * Created: Wed Feb 17 01:53:00 2010
6 * Copyright 2010 Tim Niemueller [www.niemueller.de]
7 * 2010 Masrur Doostdar <doostdar@kbsg.rwth-aachen.de>
8 *
9 ****************************************************************************/
10
11/* This program is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU General Public License as published by
13 * the Free Software Foundation; either version 2 of the License, or
14 * (at your option) any later version.
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Library General Public License for more details.
20 *
21 * Read the full text in the LICENSE.GPL file in the doc directory.
22 */
23
24#include "logreplay_thread.h"
25
26#include "file.h"
27
28#include <blackboard/blackboard.h>
29#include <blackboard/internal/instance_factory.h>
30#include <core/exceptions/system.h>
31#include <core/threading/wait_condition.h>
32#include <logging/logger.h>
33#include <utils/misc/autofree.h>
34
35#include <cerrno>
36#include <cstdio>
37#include <cstdlib>
38#include <cstring>
39#include <fcntl.h>
40#include <memory>
41#ifdef __FreeBSD__
42# include <sys/endian.h>
43#elif defined(__MACH__) && defined(__APPLE__)
44# include <sys/_endian.h>
45#else
46# include <endian.h>
47#endif
48#include <arpa/inet.h>
49#include <sys/mman.h>
50
51using namespace fawkes;
52
53/** @class BBLogReplayThread "logreplay_thread.h"
54 * BlackBoard log Replay thread.
55 * Writes the data of the logfile into a blackboard interface, considering the
56 * time-step differences between the data.
57 * @author Masrur Doostdar
58 * @author Tim Niemueller
59 */
60
61/** Constructor.
62 * @param logfile_name filename of the log to be replayed
63 * @param logdir directory containing the logfile
64 * @param scenario ID of the log scenario
65 * @param grace_period time in seconds that desired offset and loop offset may
66 * diverge to still write the new data
67 * @param loop_replay specifies if the replay should be looped
68 * @param non_blocking do not block the main loop if not enough time has elapsed
69 * to replay new data but just wait for the next cycle. This is ignored in
70 * continuous thread mode as it could cause busy waiting.
71 * @param thread_name initial thread name
72 * @param th_opmode thread operation mode
73 */
74BBLogReplayThread::BBLogReplayThread(const char * logfile_name,
75 const char * logdir,
76 const char * scenario,
77 float grace_period,
78 bool loop_replay,
79 bool non_blocking,
80 const char * thread_name,
81 fawkes::Thread::OpMode th_opmode)
82: Thread(thread_name, th_opmode)
83{
84 set_name("BBLogReplayThread(%s)", logfile_name);
86
87 logfile_name_ = strdup(logfile_name);
88 logdir_ = strdup(logdir);
89 scenario_ = strdup(scenario); // dont need this!?
90 filename_ = NULL;
91 cfg_grace_period_ = grace_period;
92 cfg_loop_replay_ = loop_replay;
93 if (th_opmode == OPMODE_WAITFORWAKEUP) {
94 cfg_non_blocking_ = non_blocking;
95 } else {
96 // would cause busy waiting
97 cfg_non_blocking_ = false;
98 }
99}
100
101/** Destructor. */
103{
104 free(logfile_name_);
105 free(logdir_);
106 free(scenario_);
107}
108
109void
111{
112 logfile_ = NULL;
113 interface_ = NULL;
114 filename_ = NULL;
115
116 if (asprintf(&filename_, "%s/%s", logdir_, logfile_name_) == -1) {
117 throw OutOfMemoryException("Cannot re-generate logfile-path");
118 }
119
120 try {
121 logfile_ = new BBLogFile(filename_, true);
122 } catch (Exception &e) {
123 finalize();
124 throw;
125 }
126
127 if (!logfile_->has_next()) {
128 finalize();
129 throw Exception("Log file %s does not have any entries", filename_);
130 }
131
132 interface_ = blackboard->open_for_writing(logfile_->interface_type(), logfile_->interface_id());
133
134 try {
135 logfile_->set_interface(interface_);
136 } catch (Exception &e) {
137 finalize();
138 throw;
139 }
140
141 logger->log_info(name(), "Replaying from %s:", filename_);
142}
143
144void
146{
147 delete logfile_;
148 if (filename_)
149 free(filename_);
150 blackboard->close(interface_);
151}
152
153void
155{
156 // Write first immediately, skip first offset
157 logfile_->read_next();
158 interface_->write();
159 last_offset_ = logfile_->entry_offset();
160 if (logfile_->has_next()) {
161 logfile_->read_next();
162 offsetdiff_ = logfile_->entry_offset() - last_offset_;
163 last_offset_ = logfile_->entry_offset();
164 }
165 last_loop_.stamp();
166}
167
168void
170{
171 if (logfile_->has_next()) {
172 // check if there is time left to wait
173 now_.stamp();
174 loopdiff_ = now_ - last_loop_;
175 if ((offsetdiff_.in_sec() - loopdiff_.in_sec()) > cfg_grace_period_) {
176 if (cfg_non_blocking_) {
177 // need to keep waiting before posting, but in non-blocking mode
178 // just wait for next loop
179 return;
180 } else {
181 waittime_ = offsetdiff_ - loopdiff_;
182 waittime_.wait();
183 }
184 }
185
186 interface_->write();
187 logfile_->read_next();
188
189 last_loop_.stamp();
190 offsetdiff_ = logfile_->entry_offset() - last_offset_;
191 last_offset_ = logfile_->entry_offset();
192
193 } else {
194 if (cfg_loop_replay_) {
195 logger->log_info(name(), "replay finished, looping");
196 logfile_->rewind();
197 } else {
198 if (opmode() == OPMODE_CONTINUOUS) {
199 // block
200 logger->log_info(name(), "replay finished, sleeping");
201 WaitCondition waitcond;
202 waitcond.wait();
203 } // else wait will just run once per loop
204 }
205 }
206}
Class to easily access bblogger log files.
Definition: bblogfile.h:40
const char * interface_type() const
Get interface type.
Definition: bblogfile.cpp:559
bool has_next()
Check if another entry is available.
Definition: bblogfile.cpp:266
void set_interface(fawkes::Interface *interface)
Set the internal interface.
Definition: bblogfile.cpp:494
void read_next()
Read next entry.
Definition: bblogfile.cpp:283
const char * interface_id() const
Get interface ID.
Definition: bblogfile.cpp:568
const fawkes::Time & entry_offset() const
Get current entry offset.
Definition: bblogfile.cpp:514
void rewind()
Rewind file to start.
Definition: bblogfile.cpp:254
virtual void init()
Initialize the thread.
BBLogReplayThread(const char *logfile_name, const char *logdir, const char *scenario, float grace_period, bool loop_replay, bool non_blocking=false, const char *thread_name="BBLogReplayThread", fawkes::Thread::OpMode th_opmode=Thread::OPMODE_CONTINUOUS)
Constructor.
virtual void loop()
Code to execute in the thread.
virtual ~BBLogReplayThread()
Destructor.
virtual void once()
Execute an action exactly once.
virtual void finalize()
Finalize the thread.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:501
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
OpMode opmode() const
Get operation mode.
Definition: thread.cpp:671
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
OpMode
Thread operation mode.
Definition: thread.h:56
@ OPMODE_CONTINUOUS
operate in continuous mode (default)
Definition: thread.h:57
@ OPMODE_WAITFORWAKEUP
operate in wait-for-wakeup mode
Definition: thread.h:58
Time & stamp()
Set this time to the current time.
Definition: time.cpp:704
void wait()
Wait (sleep) for this time.
Definition: time.cpp:736
double in_sec() const
Convet time to seconds.
Definition: time.cpp:219
Wait until a given condition holds.
void wait()
Wait for the condition forever.
Fawkes library namespace.