Fawkes API Fawkes Development Version
log_thread.cpp
1
2/***************************************************************************
3 * log_thread.cpp - BB Logger Thread
4 *
5 * Created: Sun Nov 08 00:02:09 2009
6 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL file in the doc directory.
21 */
22
23#include "log_thread.h"
24
25#include "file.h"
26
27#include <blackboard/blackboard.h>
28#include <core/exceptions/system.h>
29#include <interfaces/SwitchInterface.h>
30#include <logging/logger.h>
31
32#include <cerrno>
33#include <cstdio>
34#include <cstdlib>
35#include <cstring>
36#include <fcntl.h>
37#include <memory>
38#ifdef __FreeBSD__
39# include <sys/endian.h>
40#elif defined(__MACH__) && defined(__APPLE__)
41# include <sys/_endian.h>
42#else
43# include <endian.h>
44#endif
45#include <arpa/inet.h>
46#include <sys/mman.h>
47#include <sys/stat.h>
48
49using namespace fawkes;
50
51/** @class BBLoggerThread "log_thread.h"
52 * BlackBoard logger thread.
53 * One instance of this thread handles logging of one specific interface.
54 * The plugin will spawn as many threads as there are interfaces to log. This
55 * allows for maximum concurrency of the writers and avoids a serialization
56 * bottle neck.
57 * The log thread can operate in buffering mode. If this mode is disabled, the
58 * data is written to the file within the blackboard data changed event, and
59 * thus the writing operation can slow down the overall system, but memory
60 * requirements are low. This is useful if a lot of data is written or if the
61 * storage device is slow. If the mode is enabled, during the event the BB data
62 * will be copied into another memory segment and the thread will be woken up.
63 * Once the thread is running it stores all of the BB data segments bufferd
64 * up to then.
65 * The interface listener listens for events for a particular interface and
66 * then writes the changes to the file.
67 * @author Tim Niemueller
68 */
69
70/** Constructor.
71 * @param iface_uid interface UID which to log
72 * @param logdir directory to store config files, must exist
73 * @param buffering enable log buffering?
74 * @param flushing true to flush after each written chunk
75 * @param scenario ID of the log scenario
76 * @param start_time time to use as start time for the log
77 */
78BBLoggerThread::BBLoggerThread(const char * iface_uid,
79 const char * logdir,
80 bool buffering,
81 bool flushing,
82 const char * scenario,
83 fawkes::Time *start_time)
84: Thread("BBLoggerThread", Thread::OPMODE_WAITFORWAKEUP),
85 BlackBoardInterfaceListener("BBLoggerThread(%s)", iface_uid)
86{
88 set_name("BBLoggerThread(%s)", iface_uid);
89
90 buffering_ = buffering;
91 flushing_ = flushing;
92 uid_ = strdup(iface_uid);
93 logdir_ = strdup(logdir);
94 scenario_ = strdup(scenario);
95 start_ = new Time(start_time);
96 filename_ = NULL;
97 queue_mutex_ = new Mutex();
98 data_size_ = 0;
99 is_master_ = false;
100 enabled_ = true;
101
102 now_ = NULL;
103
104 // Parse UID
105 Interface::parse_uid(uid_, type_, id_);
106
107 char date[21];
108 Time now;
109 struct tm *tmp = localtime(&(now.get_timeval()->tv_sec));
110 strftime(date, 21, "%F-%H-%M-%S", tmp);
111
112 if (asprintf(
113 &filename_, "%s/%s-%s-%s-%s.log", LOGDIR, scenario_, type_.c_str(), id_.c_str(), date)
114 == -1) {
115 throw OutOfMemoryException("Cannot generate log name");
116 }
117}
118
119/** Destructor. */
121{
122 free(uid_);
123 free(logdir_);
124 free(scenario_);
125 free(filename_);
126 delete queue_mutex_;
127 delete start_;
128}
129
130void
132{
133 queues_[0].clear();
134 queues_[1].clear();
135 act_queue_ = 0;
136
137 queue_mutex_ = new Mutex();
138 data_size_ = 0;
139
140 now_ = NULL;
141 num_data_items_ = 0;
142 session_start_ = 0;
143
144 // use open because fopen does not provide O_CREAT | O_EXCL
145 // open read/write because of usage of mmap
146 mode_t m = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
147 int fd = open(filename_, O_RDWR | O_CREAT | O_EXCL, m);
148 if (!fd) {
149 throw CouldNotOpenFileException(filename_, errno, "Failed to open log 1");
150 } else {
151 f_data_ = fdopen(fd, "w+");
152 if (!f_data_) {
153 throw CouldNotOpenFileException(filename_, errno, "Failed to open log 2");
154 }
155 }
156
157 try {
158 iface_ = blackboard->open_for_reading(type_.c_str(), id_.c_str());
159 data_size_ = iface_->datasize();
160 } catch (Exception &e) {
161 fclose(f_data_);
162 throw;
163 }
164
165 try {
166 write_header();
167 } catch (FileWriteException &e) {
168 blackboard->close(iface_);
169 fclose(f_data_);
170 throw;
171 }
172
173 now_ = new Time(clock);
174
175 if (is_master_) {
176 try {
177 switch_if_ = blackboard->open_for_writing<SwitchInterface>("BBLogger");
178 switch_if_->set_enabled(enabled_);
179 switch_if_->write();
180 bbil_add_message_interface(switch_if_);
181 } catch (Exception &e) {
182 fclose(f_data_);
183 throw;
184 }
185 }
186
189
191
193 name(), "Logging %s to %s%s", iface_->uid(), filename_, is_master_ ? " as master" : "");
194}
195
196void
198{
200 if (is_master_) {
201 blackboard->close(switch_if_);
202 }
203 update_header();
204 fclose(f_data_);
205 for (unsigned int q = 0; q < 2; ++q) {
206 while (!queues_[q].empty()) {
207 void *t = queues_[q].front();
208 free(t);
209 queues_[q].pop();
210 }
211 }
212 delete now_;
213 now_ = NULL;
214}
215
216/** Get filename.
217 * @return file name, valid after object instantiated, but before init() does not
218 * mean that the file has been or can actually be opened
219 */
220const char *
222{
223 return filename_;
224}
225
226/** Enable or disable logging.
227 * @param enabled true to enable logging, false to disable
228 */
229void
231{
232 if (enabled && !enabled_) {
233 logger->log_info(name(), "Logging enabled");
234 session_start_ = num_data_items_;
235 } else if (!enabled && enabled_) {
237 "Logging disabled (wrote %u entries), flushing",
238 (num_data_items_ - session_start_));
239 update_header();
240 fflush(f_data_);
241 }
242
243 enabled_ = enabled;
244}
245
246/** Set threadlist and master status.
247 * This copies the thread list and sets this thread as master thread.
248 * If you intend to use this method you must do so before the thread is
249 * initialized. You may only ever declare one thread as master.
250 * @param thread_list list of threads to notify on enable/disable events
251 */
252void
254{
255 is_master_ = true;
256 threads_ = thread_list;
257}
258
259void
260BBLoggerThread::write_header()
261{
262 bblog_file_header header;
263 memset(&header, 0, sizeof(header));
264 header.file_magic = htonl(BBLOGGER_FILE_MAGIC);
265 header.file_version = htonl(BBLOGGER_FILE_VERSION);
266#if BYTE_ORDER_ == BIG_ENDIAN_
267 header.endianess = BBLOG_BIG_ENDIAN;
268#else
269 header.endianess = BBLOG_LITTLE_ENDIAN;
270#endif
271 header.num_data_items = num_data_items_;
272 strncpy(header.scenario, (const char *)scenario_, BBLOG_SCENARIO_SIZE - 1);
273 strncpy(header.interface_type, iface_->type(), BBLOG_INTERFACE_TYPE_SIZE - 1);
274 strncpy(header.interface_id, iface_->id(), BBLOG_INTERFACE_ID_SIZE - 1);
275 memcpy(header.interface_hash, iface_->hash(), BBLOG_INTERFACE_HASH_SIZE);
276 header.data_size = iface_->datasize();
277 long start_time_sec, start_time_usec;
278 start_->get_timestamp(start_time_sec, start_time_usec);
279 header.start_time_sec = start_time_sec;
280 header.start_time_usec = start_time_usec;
281 if (fwrite(&header, sizeof(header), 1, f_data_) != 1) {
282 throw FileWriteException(filename_, "Failed to write header");
283 }
284 fflush(f_data_);
285}
286
287/** Updates the num_data_items field in the header. */
288void
289BBLoggerThread::update_header()
290{
291 // write updated num_data_items field
292#if _POSIX_MAPPED_FILES
293 void *h = mmap(NULL, sizeof(bblog_file_header), PROT_WRITE, MAP_SHARED, fileno(f_data_), 0);
294 if (h == MAP_FAILED) {
296 "Failed to mmap log (%s), "
297 "not updating number of data items",
298 strerror(errno));
299 } else {
301 header->num_data_items = num_data_items_;
302 munmap(h, sizeof(bblog_file_header));
303 }
304#else
306 "Memory mapped files not available, "
307 "not updating number of data items on close");
308#endif
309}
310
311void
312BBLoggerThread::write_chunk(const void *chunk)
313{
314 bblog_entry_header ehead;
315 now_->stamp();
316 Time d = *now_ - *start_;
317 long rel_time_sec, rel_time_usec;
318 d.get_timestamp(rel_time_sec, rel_time_usec);
319 ehead.rel_time_sec = rel_time_sec;
320 ehead.rel_time_usec = rel_time_usec;
321 if ((fwrite(&ehead, sizeof(ehead), 1, f_data_) == 1)
322 && (fwrite(chunk, data_size_, 1, f_data_) == 1)) {
323 if (flushing_)
324 fflush(f_data_);
325 num_data_items_ += 1;
326 } else {
327 logger->log_warn(name(), "Failed to write chunk");
328 }
329}
330
331void
333{
334 unsigned int write_queue = act_queue_;
335 queue_mutex_->lock();
336 act_queue_ = 1 - act_queue_;
337 queue_mutex_->unlock();
338 LockQueue<void *> &queue = queues_[write_queue];
339 //logger->log_debug(name(), "Writing %zu entries", queue.size());
340 while (!queue.empty()) {
341 void *c = queue.front();
342 write_chunk(c);
343 free(c);
344 queue.pop();
345 }
346}
347
348bool
350{
353
354 bool enabled = true;
355 if ((enm = dynamic_cast<SwitchInterface::EnableSwitchMessage *>(message)) != NULL) {
356 enabled = true;
357 } else if ((dism = dynamic_cast<SwitchInterface::DisableSwitchMessage *>(message)) != NULL) {
358 enabled = false;
359 } else {
360 logger->log_debug(name(),
361 "Unhandled message type: %s via %s",
362 message->type(),
363 interface->uid());
364 }
365
366 for (ThreadList::iterator i = threads_.begin(); i != threads_.end(); ++i) {
367 BBLoggerThread *bblt = dynamic_cast<BBLoggerThread *>(*i);
368 bblt->set_enabled(enabled);
369 }
370
371 switch_if_->set_enabled(enabled_);
372 switch_if_->write();
373
374 return false;
375}
376
377void
379{
380 if (!enabled_)
381 return;
382
383 try {
384 iface_->read();
385
386 if (buffering_) {
387 void *c = malloc(iface_->datasize());
388 memcpy(c, iface_->datachunk(), iface_->datasize());
389 queue_mutex_->lock();
390 queues_[act_queue_].push_locked(c);
391 queue_mutex_->unlock();
392 wakeup();
393 } else {
394 queue_mutex_->lock();
395 write_chunk(iface_->datachunk());
396 queue_mutex_->unlock();
397 }
398
399 } catch (Exception &e) {
400 logger->log_error(name(), "Exception when data changed");
401 logger->log_error(name(), e);
402 }
403}
404
405void
406BBLoggerThread::bb_interface_writer_added(Interface *interface, Uuid instance_serial) noexcept
407{
408 session_start_ = num_data_items_;
409}
410
411void
412BBLoggerThread::bb_interface_writer_removed(Interface *interface, Uuid instance_serial) noexcept
413{
414 logger->log_info(name(),
415 "Writer removed (wrote %u entries), flushing",
416 (num_data_items_ - session_start_));
417 update_header();
418 fflush(f_data_);
419}
BlackBoard logger thread.
Definition: log_thread.h:52
virtual ~BBLoggerThread()
Destructor.
Definition: log_thread.cpp:120
virtual void init()
Initialize the thread.
Definition: log_thread.cpp:131
const char * get_filename() const
Get filename.
Definition: log_thread.cpp:221
virtual void bb_interface_data_refreshed(fawkes::Interface *interface) noexcept
BlackBoard data refreshed notification.
Definition: log_thread.cpp:378
virtual void finalize()
Finalize the thread.
Definition: log_thread.cpp:197
virtual void bb_interface_writer_added(fawkes::Interface *interface, fawkes::Uuid instance_serial) noexcept
A writing instance has been opened for a watched interface.
Definition: log_thread.cpp:406
void set_threadlist(fawkes::ThreadList &thread_list)
Set threadlist and master status.
Definition: log_thread.cpp:253
virtual void loop()
Code to execute in the thread.
Definition: log_thread.cpp:332
virtual bool bb_interface_message_received(fawkes::Interface *interface, fawkes::Message *message) noexcept
BlackBoard message received notification.
Definition: log_thread.cpp:349
BBLoggerThread(const char *iface_uid, const char *logdir, bool buffering, bool flushing, const char *scenario, fawkes::Time *start_time)
Constructor.
Definition: log_thread.cpp:78
virtual void bb_interface_writer_removed(fawkes::Interface *interface, fawkes::Uuid instance_serial) noexcept
A writing instance has been closed for a watched interface.
Definition: log_thread.cpp:412
void set_enabled(bool enabled)
Enable or disable logging.
Definition: log_thread.cpp:230
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
BlackBoard interface listener.
void bbil_add_message_interface(Interface *interface)
Add an interface to the message received watch list.
void bbil_add_writer_interface(Interface *interface)
Add an interface to the writer addition/removal watch list.
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:212
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:185
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
File could not be opened.
Definition: system.h:53
Base class for exceptions in Fawkes.
Definition: exception.h:36
Could not write to file.
Definition: system.h:69
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
const char * type() const
Get type of interface.
Definition: interface.cpp:652
const unsigned char * hash() const
Get interface hash.
Definition: interface.cpp:305
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:501
const char * id() const
Get identifier of interface.
Definition: interface.cpp:661
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:686
unsigned int datasize() const
Get data size.
Definition: interface.cpp:540
void clear()
Clear the queue.
Definition: lock_queue.h:153
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Definition: message.h:44
virtual void log_info(const char *component, const char *format,...)
Log informational message.
Definition: multi.cpp:195
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Definition: multi.cpp:174
virtual void log_error(const char *component, const char *format,...)
Log error message.
Definition: multi.cpp:237
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
DisableSwitchMessage Fawkes BlackBoard Interface Message.
EnableSwitchMessage Fawkes BlackBoard Interface Message.
SwitchInterface Fawkes BlackBoard Interface.
void set_enabled(const bool new_enabled)
Set enabled value.
List of threads.
Definition: thread_list.h:56
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
void set_coalesce_wakeups(bool coalesce=true)
Set wakeup coalescing.
Definition: thread.cpp:729
A class for handling time.
Definition: time.h:93
void get_timestamp(long &sec, long &usec) const
Get time stamp.
Definition: time.h:137
Time & stamp()
Set this time to the current time.
Definition: time.cpp:704
const timeval * get_timeval() const
Obtain the timeval where the time is stored.
Definition: time.h:112
A convenience class for universally unique identifiers (UUIDs).
Definition: uuid.h:29
Fawkes library namespace.
BBLogger entry header.
Definition: file.h:76
uint32_t rel_time_usec
time since start time, microseconds
Definition: file.h:78
uint32_t rel_time_sec
time since start time, seconds
Definition: file.h:77
BBLogger file header definition.
Definition: file.h:53
char interface_type[BBLOG_INTERFACE_TYPE_SIZE]
Interface type.
Definition: file.h:64
char scenario[BBLOG_SCENARIO_SIZE]
Scenario as defined in config.
Definition: file.h:62
uint64_t start_time_sec
Start time, timestamp seconds.
Definition: file.h:68
uint32_t data_size
size of one interface data block
Definition: file.h:67
char interface_id[BBLOG_INTERFACE_ID_SIZE]
Interface ID.
Definition: file.h:65
uint64_t start_time_usec
Start time, timestamp microseconds.
Definition: file.h:69
uint32_t endianess
Endianess, 0 little endian, 1 big endian.
Definition: file.h:58
uint32_t file_version
File version, set to BBLOGGER_FILE_VERSION on write and verify on read (big endian)
Definition: file.h:56
uint32_t num_data_items
Number of data items in file, if set to zero reader must scan the file for this number.
Definition: file.h:60
unsigned char interface_hash[BBLOG_INTERFACE_HASH_SIZE]
Interface Hash.
Definition: file.h:66
uint32_t file_magic
Magic value to identify file, must be 0xFFBBFFBB (big endian)
Definition: file.h:54