Fawkes API Fawkes Development Version
plexil_thread.cpp
1
2/***************************************************************************
3 * plexil_thread.cpp - PLEXIL executive
4 *
5 * Created: Mon Aug 13 11:20:12 2018
6 * Copyright 2006-2018 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "plexil_thread.h"
23
24#include "be_adapter.h"
25#include "clock_adapter.h"
26#include "log_adapter.h"
27#include "log_stream.h"
28#include "thread_adapter.h"
29#ifdef HAVE_NAVGRAPH
30# include "navgraph_access_thread.h"
31# include "navgraph_adapter.h"
32#endif
33#include "utils.h"
34
35#include <core/threading/mutex_locker.h>
36#include <utils/sub_process/proc.h>
37#include <utils/system/dynamic_module/module.h>
38
39#include <AdapterConfiguration.hh>
40#include <Debug.hh>
41#include <ExecApplication.hh>
42#include <InterfaceManager.hh>
43#include <InterfaceSchema.hh>
44#include <boost/filesystem.hpp>
45#include <boost/interprocess/sync/file_lock.hpp>
46#include <cstring>
47#include <fstream>
48#include <numeric>
49#include <pugixml.hpp>
50
51using namespace fawkes;
52namespace fs = boost::filesystem;
53// for C++17 could be:
54// namespace fs = std::filesystem;
55
56/** @class PlexilExecutiveThread "plexil_thread.h"
57 * Main thread of PLEXIL executive.
58 *
59 * @author Tim Niemueller
60 */
61
62/** Constructor. */
64: Thread("PlexilExecutiveThread", Thread::OPMODE_CONTINUOUS)
65{
67}
68
69/** Destructor. */
71{
72}
73
74void
76{
77 cfg_spec_ = config->get_string("/plexil/spec");
78
79 std::string cfg_prefix = "/plexil/" + cfg_spec_ + "/";
80
81 bool cfg_print_xml = config->get_bool_or_default((cfg_prefix + "debug/print-xml").c_str(), false);
82
83 std::map<std::string, plexil_interface_config> cfg_adapters =
84 read_plexil_interface_configs(cfg_prefix + "adapters/");
85
86 std::map<std::string, plexil_interface_config> cfg_listeners =
87 read_plexil_interface_configs(cfg_prefix + "listeners/");
88
89 std::vector<std::string> cfg_lib_path =
90 config->get_strings_or_defaults((cfg_prefix + "plan/lib-path").c_str(), {});
91
92 std::string cfg_basedir =
93 config->get_string_or_default((cfg_prefix + "plan/basedir").c_str(), "");
94
95 for (auto &a_item : cfg_adapters) {
96 auto &a = a_item.second;
97 if (a.type == "Utility") {
98 logger->log_warn(name(), "Utility adapter configured, consider using FawkesLogging instead");
99 } else if (a.type == "OSNativeTime") {
101 "OSNativeTime adapter configured, consider using FawkesTime instead");
102 } else if (a.type == "FawkesRemoteAdapter") {
103 logger->log_error(name(), "Cannot load FawkesRemoteAdapter when running internally");
104 throw Exception("Plexil: cannot load FawkesRemoteAdapter when running internally");
105 }
106
107 std::string filename =
108 std::string(LIBDIR) + "/plexil/" + a.type + "." + fawkes::Module::get_file_extension();
109 if (fs::exists(filename)) {
110 a.attr["LibPath"] = filename;
111 }
112 }
113
114 plexil_.reset(new PLEXIL::ExecApplication);
115
116 PLEXIL::g_manager->setProperty("::Fawkes::Config", config);
117 PLEXIL::g_manager->setProperty("::Fawkes::Clock", clock);
118 PLEXIL::g_manager->setProperty("::Fawkes::Logger", logger);
119 PLEXIL::g_manager->setProperty("::Fawkes::BlackBoard", blackboard);
120
121 for (const auto &p : cfg_lib_path) {
122 plexil_->addLibraryPath(p);
123 }
124
125 pugi::xml_document xml_config;
126 pugi::xml_node xml_interfaces =
127 xml_config.append_child(PLEXIL::InterfaceSchema::INTERFACES_TAG());
128
129 add_plexil_interface_configs(xml_interfaces,
130 cfg_adapters,
131 PLEXIL::InterfaceSchema::ADAPTER_TAG(),
132 PLEXIL::InterfaceSchema::ADAPTER_TYPE_ATTR());
133 add_plexil_interface_configs(xml_interfaces,
134 cfg_listeners,
135 PLEXIL::InterfaceSchema::LISTENER_TAG(),
136 PLEXIL::InterfaceSchema::LISTENER_TYPE_ATTR());
137
138 auto navgraph_adapter_config =
139 std::find_if(cfg_adapters.begin(), cfg_adapters.end(), [](const auto &entry) {
140 return entry.second.type == "NavGraphAdapter";
141 });
142 if (navgraph_adapter_config != cfg_adapters.end()) {
143#ifdef HAVE_NAVGRAPH
144 navgraph_access_thread_ = new PlexilNavgraphAccessThread();
145 thread_collector->add(navgraph_access_thread_);
146 navgraph_ = navgraph_access_thread_->get_navgraph();
147 PLEXIL::g_manager->setProperty("::Fawkes::NavGraph", &navgraph_);
148#else
149 throw Exception("NavGraph adapter configured, "
150 "but navgraph library not available at compile time");
151#endif
152 }
153
154 if (cfg_print_xml) {
155 struct xml_string_writer : pugi::xml_writer
156 {
157 std::string result;
158 virtual void
159 write(const void *data, size_t size)
160 {
161 result.append(static_cast<const char *>(data), size);
162 }
163 };
164
165 xml_string_writer writer;
166 xml_config.save(writer);
167 logger->log_info(name(), "Interface config XML:\n%s", writer.result.c_str());
168 }
169
170 if (config->get_bool_or_default((cfg_prefix + "debug/enable").c_str(), false)) {
171 std::vector<std::string> debug_markers =
172 config->get_strings_or_defaults((cfg_prefix + "debug/markers").c_str(), {});
173
174 std::stringstream dbg_config;
175 for (const auto &m : debug_markers) {
176 dbg_config << m << std::endl;
177 }
178 PLEXIL::readDebugConfigStream(dbg_config);
179 }
180
181 log_buffer_.reset(new PlexilLogStreamBuffer(logger));
182 log_stream_.reset(new std::ostream(&*log_buffer_));
183 PLEXIL::setDebugOutputStream(*log_stream_);
184
185 if (!plexil_->initialize(xml_interfaces)) {
186 throw Exception("Failed to initialize Plexil application");
187 }
188
189 if (config->is_list(cfg_prefix + "plan/ple")) {
190 cfg_plan_ple_ = config->get_strings_or_defaults((cfg_prefix + "plan/ple").c_str(), {});
191 } else {
192 std::string ple = config->get_string_or_default((cfg_prefix + "plan/ple").c_str(), "");
193 if (!ple.empty()) {
194 cfg_plan_ple_ = {ple};
195 }
196 }
197 if (cfg_plan_ple_.empty()) {
198 throw Exception("No PLE configured");
199 }
200 cfg_plan_plx_ = config->get_string((cfg_prefix + "plan/plx").c_str());
201 cfg_plan_auto_compile_ =
202 config->get_bool_or_default((cfg_prefix + "plan/compilation/enable").c_str(), false);
203 cfg_plan_force_compile_ =
204 config->get_bool_or_default((cfg_prefix + "plan/compilation/force").c_str(), false);
205
206 if (!cfg_plan_plx_.empty()) {
207 cfg_plan_plx_ = cfg_basedir + "/" + cfg_plan_plx_;
208 replace_tokens(cfg_plan_plx_);
209 }
210
211 std::set<std::string> base_paths;
212
213 for (auto &p : cfg_plan_ple_) {
214 p = cfg_basedir + "/" + p;
215 replace_tokens(p);
216
217 fs::path ple_path{p};
218 fs::path plx_path{fs::path{ple_path}.replace_extension(".plx")};
219
220 // make sure not two processes try to compile at the same time
221 boost::interprocess::file_lock flock(ple_path.string().c_str());
222
223 base_paths.insert(plx_path.parent_path().string());
224
225 if (cfg_plan_auto_compile_) {
226 if (cfg_plan_force_compile_ || !fs::exists(plx_path)
227 || fs::last_write_time(plx_path) < fs::last_write_time(ple_path)) {
228 logger->log_info(name(), "Compiling %s", ple_path.string().c_str());
229 plexil_compile(ple_path.string());
230 }
231 } else {
232 if (!fs::exists(plx_path)) {
233 throw Exception("PLX %s does not exist and auto-compile disabled");
234 } else if (fs::last_write_time(plx_path) < fs::last_write_time(ple_path)) {
236 "PLX %s older than PLE, auto-compile disabled",
237 plx_path.string().c_str());
238 }
239 }
240 }
241
242 if (!fs::exists(cfg_plan_plx_)) {
243 throw Exception("PLX %s does not exist", cfg_plan_plx_.c_str());
244 }
245
246 for (const auto &p : base_paths) {
247 plexil_->addLibraryPath(p);
248 }
249
250 plan_plx_.reset(new pugi::xml_document);
251 pugi::xml_parse_result parse_result = plan_plx_->load_file(cfg_plan_plx_.c_str());
252 if (parse_result.status != pugi::status_ok) {
253 throw Exception("Failed to parse plan '%s': %s",
254 cfg_plan_plx_.c_str(),
255 parse_result.description());
256 }
257}
258
259void
261{
262 if (!plexil_->startInterfaces()) {
263 throw Exception("Failed to start Plexil interfaces");
264 }
265 if (!plexil_->run()) {
266 throw Exception("Failed to start Plexil");
267 }
268
269 if (!plexil_->addPlan(&*plan_plx_)) {
270 logger->log_error(name(), "Failed to add Plexil plan. See log for details");
271 } else {
272 plexil_->notifyExec();
273 }
274}
275
276bool
278{
279 if (!plexil_->stop()) {
280 logger->log_error(name(), "Failed to stop Plexil");
281 }
282 plexil_->notifyExec();
283 return true;
284}
285
286void
288{
289 if (!plexil_->shutdown()) {
290 logger->log_error(name(), "Failed to shutdown Plexil");
291 }
292 PLEXIL::g_configuration->clearAdapterRegistry();
293 plexil_->waitForShutdown();
294
295 // We really should do a reset here, killing off the ExecApplication instance.
296 // However, the executive crashes in a state cache destructor if there is any
297 // active wait (or probably any active LookupOnChange, as here on time).
298 // Therefore, we accept this memleak here under the assumption, that we do not
299 // frequently reload the plexil plugin. This at least avoids the segfaut on quit.
300 plexil_.release();
301 //plexil_.reset();
302 log_stream_.reset();
303 log_buffer_.reset();
304 plan_plx_.reset();
305#ifdef HAVE_NAVGRAPH
306 if (navgraph_) {
307 navgraph_.clear();
308 thread_collector->remove(navgraph_access_thread_);
309 delete navgraph_access_thread_;
310 }
311#endif
312}
313
314void
316{
317 //plexil_->notifyExec();
318 //plexil_->waitForPlanFinished();
319 static PLEXIL::ExecApplication::ApplicationState state = PLEXIL::ExecApplication::APP_SHUTDOWN;
320 PLEXIL::ExecApplication::ApplicationState new_state = plexil_->getApplicationState();
321 if (new_state != state) {
322 logger->log_info(name(), "State changed to %s", plexil_->getApplicationStateName(new_state));
323 state = new_state;
324 }
325
326 using namespace std::chrono_literals;
327 std::this_thread::sleep_for(500ms);
328}
329
330// Parse adapter configurations
331std::map<std::string, PlexilExecutiveThread::plexil_interface_config>
332PlexilExecutiveThread::read_plexil_interface_configs(const std::string &config_prefix)
333{
334 std::map<std::string, plexil_interface_config> cfg_adapters;
335
336 std::unique_ptr<Configuration::ValueIterator> cfg_item{config->search(config_prefix)};
337 while (cfg_item->next()) {
338 std::string path = cfg_item->path();
339
340 std::string::size_type start_pos = config_prefix.size();
341 std::string::size_type slash_pos = path.find("/", start_pos + 1);
342 if (slash_pos != std::string::npos) {
343 std::string id = path.substr(start_pos, slash_pos - start_pos);
344
345 start_pos = slash_pos + 1;
346 slash_pos = path.find("/", start_pos);
347 std::string what = path.substr(start_pos, slash_pos - start_pos);
348
349 if (what == "type") {
350 cfg_adapters[id].type = cfg_item->get_string();
351 } else if (what == "attr") {
352 start_pos = slash_pos + 1;
353 slash_pos = path.find("/", start_pos);
354 std::string key = path.substr(start_pos, slash_pos - start_pos);
355 cfg_adapters[id].attr[key] = cfg_item->get_as_string();
356 } else if (what == "args") {
357 start_pos = slash_pos + 1;
358 slash_pos = path.find("/", start_pos);
359 std::string key = path.substr(start_pos, slash_pos - start_pos);
360 cfg_adapters[id].args[key] = cfg_item->get_as_string();
361 } else if (what == "verbatim-args") {
362 start_pos = slash_pos + 1;
363 slash_pos = path.find("/", start_pos);
364 std::string verb_id = path.substr(start_pos, slash_pos - start_pos);
365
366 start_pos = slash_pos + 1;
367 slash_pos = path.find("/", start_pos);
368 std::string verb_what = path.substr(start_pos, slash_pos - start_pos);
369
370 if (verb_what == "tag") {
371 cfg_adapters[id].verbatim_args[verb_id].tag = cfg_item->get_as_string();
372 } else if (verb_what == "text") {
373 cfg_adapters[id].verbatim_args[verb_id].has_text = true;
374 cfg_adapters[id].verbatim_args[verb_id].text = cfg_item->get_as_string();
375 } else if (verb_what == "attr") {
376 start_pos = slash_pos + 1;
377 slash_pos = path.find("/", start_pos);
378 std::string verb_key = path.substr(start_pos, slash_pos - start_pos);
379 cfg_adapters[id].verbatim_args[verb_id].attr[verb_key] = cfg_item->get_as_string();
380 }
381 } else if (what == "verbatim-xml") {
382 logger->log_warn(name(), "Parsing verbatim");
383 pugi::xml_parse_result parse_result =
384 cfg_adapters[id].verbatim.load_string(cfg_item->get_string().c_str());
385 if (parse_result.status != pugi::status_ok) {
386 throw Exception("Failed to parse verbatim-xml for '%s': %s",
387 cfg_adapters[id].type.c_str(),
388 parse_result.description());
389 }
390 }
391 }
392 }
393 return cfg_adapters;
394}
395
396// Add adapter configurations to Plexil interface XML config
397void
398PlexilExecutiveThread::add_plexil_interface_configs(
399 pugi::xml_node & parent,
400 const std::map<std::string, PlexilExecutiveThread::plexil_interface_config> &configs,
401 const char * tag_name,
402 const char * type_attr_name)
403{
404 for (const auto &a_item : configs) {
405 const auto & a = a_item.second;
406 pugi::xml_node xml_adapter = parent.append_child(tag_name);
407 xml_adapter.append_attribute(type_attr_name).set_value(a.type.c_str());
408 for (const auto &attr : a.attr) {
409 xml_adapter.append_attribute(attr.first.c_str()).set_value(attr.second.c_str());
410 }
411 for (const auto &arg : a.args) {
412 pugi::xml_node xml_adapter_arg = xml_adapter.append_child("Parameter");
413 xml_adapter_arg.append_attribute("key").set_value(arg.first.c_str());
414 xml_adapter_arg.text().set(arg.second.c_str());
415 }
416 for (const auto &arg : a.verbatim_args) {
417 const auto & varg = arg.second;
418 pugi::xml_node xml_adapter_arg = xml_adapter.append_child(varg.tag.c_str());
419 for (const auto &attr : varg.attr) {
420 xml_adapter_arg.append_attribute(attr.first.c_str()).set_value(attr.second.c_str());
421 }
422 if (varg.has_text) {
423 xml_adapter_arg.text().set(varg.text.c_str());
424 }
425 }
426 if (a.verbatim && a.verbatim.children().begin() != a.verbatim.children().end()) {
427 for (const auto &child : a.verbatim.children()) {
428 xml_adapter.append_copy(child);
429 }
430 }
431 }
432}
433
434void
435PlexilExecutiveThread::plexil_compile(const std::string &ple_file)
436{
437 std::vector<std::string> argv{"plexilc", ple_file};
438 std::string command_line =
439 std::accumulate(std::next(argv.begin()),
440 argv.end(),
441 argv.front(),
442 [](std::string &s, const std::string &a) { return s + " " + a; });
443 logger->log_debug(name(), "Compiler command: %s", command_line.c_str());
444
445 SubProcess proc("plexilc", "plexilc", argv, {}, logger);
446 using namespace std::chrono_literals;
447 auto compile_start = std::chrono::system_clock::now();
448 auto now = std::chrono::system_clock::now();
449 do {
450 proc.check_proc();
451 if (!proc.alive()) {
452 if (proc.exit_status() != 0) {
453 throw Exception("Plexil compilation failed, check log for messages.");
454 } else {
455 break;
456 }
457 }
458 now = std::chrono::system_clock::now();
459 std::this_thread::sleep_for(500ms);
460 } while (now < compile_start + 30s);
461 if (proc.alive()) {
462 proc.kill(SIGINT);
463 throw Exception("Plexil compilation timeout after 30s");
464 }
465}
virtual void init()
Initialize the thread.
virtual ~PlexilExecutiveThread()
Destructor.
PlexilExecutiveThread()
Constructor.
virtual void loop()
Code to execute in the thread.
virtual void finalize()
Finalize the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
virtual void once()
Execute an action exactly once.
Log Plexil log output to Fawkes logger.
Definition: log_stream.h:31
Access to internal navgraph for Plexil.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual const char * path() const =0
Path of value.
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
Definition: config.cpp:736
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual bool is_list(const char *path)=0
Check if a value is a list.
virtual bool get_bool_or_default(const char *path, const bool &default_val)
Get value from configuration which is of type bool, or the given default if the path does not exist.
Definition: config.cpp:726
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
virtual std::vector< std::string > get_strings_or_defaults(const char *path, const std::vector< std::string > &default_val)
Get list of values from configuration which is of type string, or the given default if the path does ...
Definition: config.cpp:786
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
static const char * get_file_extension()
Get file extension for dl modules.
Definition: module.cpp:260
Sub-process execution with stdin/stdout/stderr redirection.
Definition: proc.h:37
bool alive()
Check if process is alive.
Definition: proc.cpp:353
int exit_status()
Get exit status of process once it ended.
Definition: proc.cpp:365
void check_proc()
Check if the process is still alive.
Definition: proc.cpp:375
void kill(int signum)
Send a signal to the process.
Definition: proc.cpp:188
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
Fawkes library namespace.