Fawkes API Fawkes Development Version
openprs_thread.cpp
1
2/***************************************************************************
3 * openprs_thread.cpp - OpenPRS environment providing Thread
4 *
5 * Created: Thu Aug 14 15:52:35 2014
6 * Copyright 2014-2015 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "openprs_thread.h"
23
24#include "utils/openprs_mp_proxy.h"
25#include "utils/openprs_server_proxy.h"
26
27#include <baseapp/run.h>
28#include <logging/logger.h>
29#include <netcomm/fawkes/network_manager.h>
30#include <utils/sub_process/proc.h>
31
32#include <boost/bind/bind.hpp>
33#include <boost/format.hpp>
34#include <boost/lambda/bind.hpp>
35#include <boost/lambda/lambda.hpp>
36#include <cerrno>
37#include <csignal>
38#include <cstdio>
39#include <cstdlib>
40#include <unistd.h>
41
42using namespace fawkes;
43
44/** @class OpenPRSThread "openprs_thread.h"
45 * OpenPRS environment thread.
46 *
47 * @author Tim Niemueller
48 */
49
50/** Constructor. */
52: Thread("OpenPRSThread", Thread::OPMODE_WAITFORWAKEUP),
53 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE),
54 AspectProviderAspect(inifin_list()),
55 server_socket_(io_service_),
56 deadline_(io_service_)
57{
58}
59
60/** Destructor. */
62{
63}
64
65void
67{
68 proc_srv_ = NULL;
69 proc_mp_ = NULL;
70 openprs_server_proxy_ = NULL;
71 openprs_mp_proxy_ = NULL;
72
73 char hostname[HOST_NAME_MAX];
74 if (gethostname(hostname, HOST_NAME_MAX) == -1) {
75 strcpy(hostname, "localhost");
76 }
77
78 cfg_mp_run_ = config->get_bool("/openprs/message-passer/run");
79 cfg_mp_bin_ = config->get_string("/openprs/message-passer/binary");
80 try {
81 cfg_mp_host_ = config->get_string("/openprs/message-passer/hostname");
82 } catch (Exception &e) {
83 cfg_mp_host_ = hostname;
84 }
85 cfg_mp_port_ = config->get_uint("/openprs/message-passer/tcp-port");
86 cfg_mp_port_s_ = boost::str(boost::format("%u") % cfg_mp_port_);
87 cfg_mp_use_proxy_ = config->get_bool("/openprs/message-passer/use-proxy");
88 cfg_mp_proxy_port_ = config->get_uint("/openprs/message-passer/proxy-tcp-port");
89
90 cfg_server_run_ = config->get_bool("/openprs/server/run");
91 cfg_server_bin_ = config->get_string("/openprs/server/binary");
92 try {
93 cfg_server_host_ = config->get_string("/openprs/server/hostname");
94 } catch (Exception &e) {
95 cfg_server_host_ = hostname;
96 }
97 cfg_server_port_ = config->get_uint("/openprs/server/tcp-port");
98 cfg_server_port_s_ = boost::str(boost::format("%u") % cfg_server_port_);
99 cfg_server_proxy_port_ = config->get_uint("/openprs/server/proxy-tcp-port");
100
101 cfg_server_timeout_ = config->get_float("/openprs/server/timeout");
102 cfg_kernel_timeout_ = config->get_float("/openprs/kernels/start-timeout");
103
104 openprs_aspect_inifin_.set_kernel_timeout(cfg_kernel_timeout_);
105
106 if (cfg_mp_run_) {
107 logger->log_warn(name(), "Running OPRS-mp");
108 const char *filename = cfg_mp_bin_.c_str();
109 const char *argv[] = {filename, "-j", cfg_mp_port_s_.c_str(), NULL};
110 proc_mp_ = new SubProcess("OPRS-mp", filename, argv, NULL, logger);
111 } else {
112 proc_mp_ = NULL;
113 }
114
115 if (cfg_server_run_) {
116 logger->log_warn(name(), "Running OPRS-server");
117 const char *filename = cfg_server_bin_.c_str();
118 const char *argv[] = {filename,
119 "-j",
120 cfg_mp_port_s_.c_str(),
121 "-i",
122 cfg_server_port_s_.c_str(),
123 "-l",
124 "lower",
125 NULL};
126 proc_srv_ = new SubProcess("OPRS-server", filename, argv, NULL, logger);
127 } else {
128 proc_srv_ = NULL;
129 }
130
131#if BOOST_VERSION >= 104800
132 logger->log_info(name(), "Verifying OPRS-server availability");
133
134 boost::asio::ip::tcp::resolver resolver(io_service_);
135 boost::asio::ip::tcp::resolver::query query(cfg_server_host_, cfg_server_port_s_);
136 boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
137
138 // this is just the overly complicated way to get a timeout on
139 // a synchronous connect, cf.
140 // http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/example/cpp03/timeouts/blocking_tcp_client.cpp
141 deadline_.expires_at(boost::posix_time::pos_infin);
142 check_deadline(deadline_, server_socket_);
143
144 deadline_.expires_from_now(boost::posix_time::seconds(cfg_server_timeout_));
145
146 boost::system::error_code ec = boost::asio::error::would_block;
147 server_socket_.async_connect(iter->endpoint(), boost::lambda::var(ec) = boost::lambda::_1);
148
149 // Block until the asynchronous operation has completed.
150 do {
151 io_service_.run_one();
152# if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500
153 // Boost 1.54 has a bug that causes async_connect to report success
154 // if it cannot connect at all to the other side, cf.
155 // https://svn.boost.org/trac/boost/ticket/8795
156 // Work around by explicitly checking for connected status
157 if (!ec) {
158 server_socket_.remote_endpoint(ec);
159 if (ec == boost::system::errc::not_connected) {
160 // continue waiting for timeout
161 ec = boost::asio::error::would_block;
162 server_socket_.async_connect(iter->endpoint(), boost::lambda::var(ec) = boost::lambda::_1);
163 }
164 }
165# endif
166 } while (ec == boost::asio::error::would_block);
167
168 // Determine whether a connection was successfully established.
169 if (ec || !server_socket_.is_open()) {
170 finalize();
171 if (ec.value() == boost::system::errc::operation_canceled) {
172 throw Exception("OpenPRS waiting for server to come up timed out");
173 } else {
174 throw Exception("OpenPRS waiting for server failed: %s", ec.message().c_str());
175 }
176 }
177#else
178 logger->log_warn(name(), "Cannot verify server aliveness, Boost too old");
179#endif
180
181 boost::asio::socket_base::keep_alive keep_alive_option(true);
182 server_socket_.set_option(keep_alive_option);
183
184 // receive greeting
185 std::string greeting = OpenPRSServerProxy::read_string_from_socket(server_socket_);
186 //logger->log_info(name(), "Received server greeting: %s", greeting.c_str());
187 // send our greeting
188 OpenPRSServerProxy::write_string_to_socket(server_socket_, "fawkes");
189 OpenPRSServerProxy::write_int_to_socket(server_socket_, getpid());
190 OpenPRSServerProxy::write_int_to_socket(server_socket_, 0);
191
192 io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
193
194 logger->log_info(name(), "Starting OpenPRS server proxy");
195
196 openprs_server_proxy_ =
197 new OpenPRSServerProxy(cfg_server_proxy_port_, cfg_server_host_, cfg_server_port_, logger);
198
199 if (cfg_mp_use_proxy_) {
200 logger->log_info(name(), "Starting OpenPRS message passer proxy");
201 openprs_mp_proxy_ =
202 new OpenPRSMessagePasserProxy(cfg_mp_proxy_port_, cfg_mp_host_, cfg_mp_port_, logger);
203 } else {
204 openprs_mp_proxy_ = NULL;
205 }
206
207 logger->log_warn(name(), "Initializing kernel manager");
208 openprs_kernel_mgr_ =
209 new OpenPRSKernelManager(hostname,
210 cfg_server_proxy_port_,
211 cfg_mp_use_proxy_ ? hostname : cfg_mp_host_,
212 cfg_mp_use_proxy_ ? cfg_mp_proxy_port_ : cfg_mp_port_,
213 logger,
214 clock,
215 config);
216 openprs_aspect_inifin_.prepare("localhost",
217 fawkes::runtime::network_manager->fawkes_port(),
218 openprs_kernel_mgr_,
219 openprs_server_proxy_,
220 openprs_mp_proxy_);
221 openprs_manager_aspect_inifin_.set_manager(openprs_kernel_mgr_);
222}
223
224void
226{
227 server_socket_.close();
228 io_service_.stop();
229 if (io_service_thread_.joinable()) {
230 io_service_thread_.join();
231 }
232
233 if (proc_srv_) {
234 logger->log_info(name(), "Killing OpenPRS server");
235 proc_srv_->kill(SIGINT);
236 }
237 if (proc_mp_) {
238 logger->log_info(name(), "Killing OpenPRS message passer");
239 proc_mp_->kill(SIGINT);
240 }
241
242 delete proc_srv_;
243 delete proc_mp_;
244
245 delete openprs_server_proxy_;
246 delete openprs_mp_proxy_;
247 openprs_kernel_mgr_.clear();
248}
249
250void
252{
253 if (proc_srv_)
254 proc_srv_->check_proc();
255 if (proc_mp_)
256 proc_mp_->check_proc();
257}
258
259const std::list<AspectIniFin *>
260OpenPRSThread::inifin_list()
261{
262 std::list<AspectIniFin *> rv;
263 rv.push_back(&openprs_aspect_inifin_);
264 rv.push_back(&openprs_manager_aspect_inifin_);
265 return rv;
266}
267
268bool
269OpenPRSThread::server_alive()
270{
271 if (server_socket_.is_open()) {
272 boost::system::error_code ec;
273 server_socket_.remote_endpoint(ec);
274 return !ec;
275 } else {
276 return false;
277 }
278}
279
280void
281OpenPRSThread::check_deadline(boost::asio::deadline_timer & deadline,
282 boost::asio::ip::tcp::socket &socket)
283{
284 if (deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
285 socket.close();
286 deadline.expires_at(boost::posix_time::pos_infin);
287 }
288
289#if BOOST_VERSION >= 104800
290 deadline.async_wait(boost::lambda::bind(
291 &OpenPRSThread::check_deadline, this, boost::ref(deadline), boost::ref(socket)));
292#else
293 deadline.async_wait(
294 boost::bind(&OpenPRSThread::check_deadline, this, boost::ref(deadline), boost::ref(socket)));
295#endif
296}
virtual void loop()
Code to execute in the thread.
OpenPRSThread()
Constructor.
virtual ~OpenPRSThread()
Destructor.
virtual void init()
Initialize the thread.
virtual void finalize()
Finalize the thread.
Thread aspect provide a new aspect.
Thread aspect to use blocked timing.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void clear()
Set underlying instance to 0, decrementing reference count of existing instance appropriately.
Definition: lockptr.h:499
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
void prepare(const std::string &fawkes_host, unsigned short fawkes_port, LockPtr< OpenPRSKernelManager > &openprs_kernel_mgr, OpenPRSServerProxy *openprs_server_proxy, OpenPRSMessagePasserProxy *openprs_mp_proxy)
Prepare OpenPRS aspect initializer.
void set_kernel_timeout(float timeout_sec)
Set timeout for kernel creation.
void set_manager(LockPtr< OpenPRSKernelManager > &clips_kernel_mgr)
Set OpenPRS environment manger.
Proxy for the OpenPRS server communication.
Proxy for the OpenPRS server communication.
Sub-process execution with stdin/stdout/stderr redirection.
Definition: proc.h:37
void check_proc()
Check if the process is still alive.
Definition: proc.cpp:375
void kill(int signum)
Send a signal to the process.
Definition: proc.cpp:188
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
Fawkes library namespace.