22#include "openprs_thread.h"
24#include "utils/openprs_mp_proxy.h"
25#include "utils/openprs_server_proxy.h"
27#include <baseapp/run.h>
28#include <logging/logger.h>
29#include <netcomm/fawkes/network_manager.h>
30#include <utils/sub_process/proc.h>
32#include <boost/bind/bind.hpp>
33#include <boost/format.hpp>
34#include <boost/lambda/bind.hpp>
35#include <boost/lambda/lambda.hpp>
55 server_socket_(io_service_),
56 deadline_(io_service_)
70 openprs_server_proxy_ = NULL;
71 openprs_mp_proxy_ = NULL;
73 char hostname[HOST_NAME_MAX];
74 if (gethostname(hostname, HOST_NAME_MAX) == -1) {
75 strcpy(hostname,
"localhost");
83 cfg_mp_host_ = hostname;
85 cfg_mp_port_ =
config->
get_uint(
"/openprs/message-passer/tcp-port");
86 cfg_mp_port_s_ = boost::str(boost::format(
"%u") % cfg_mp_port_);
87 cfg_mp_use_proxy_ =
config->
get_bool(
"/openprs/message-passer/use-proxy");
88 cfg_mp_proxy_port_ =
config->
get_uint(
"/openprs/message-passer/proxy-tcp-port");
95 cfg_server_host_ = hostname;
98 cfg_server_port_s_ = boost::str(boost::format(
"%u") % cfg_server_port_);
99 cfg_server_proxy_port_ =
config->
get_uint(
"/openprs/server/proxy-tcp-port");
102 cfg_kernel_timeout_ =
config->
get_float(
"/openprs/kernels/start-timeout");
108 const char *filename = cfg_mp_bin_.c_str();
109 const char *argv[] = {filename,
"-j", cfg_mp_port_s_.c_str(), NULL};
115 if (cfg_server_run_) {
117 const char *filename = cfg_server_bin_.c_str();
118 const char *argv[] = {filename,
120 cfg_mp_port_s_.c_str(),
122 cfg_server_port_s_.c_str(),
131#if BOOST_VERSION >= 104800
134 boost::asio::ip::tcp::resolver resolver(io_service_);
135 boost::asio::ip::tcp::resolver::query query(cfg_server_host_, cfg_server_port_s_);
136 boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
141 deadline_.expires_at(boost::posix_time::pos_infin);
142 check_deadline(deadline_, server_socket_);
144 deadline_.expires_from_now(boost::posix_time::seconds(cfg_server_timeout_));
146 boost::system::error_code ec = boost::asio::error::would_block;
147 server_socket_.async_connect(iter->endpoint(), boost::lambda::var(ec) = boost::lambda::_1);
151 io_service_.run_one();
152# if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500
158 server_socket_.remote_endpoint(ec);
159 if (ec == boost::system::errc::not_connected) {
161 ec = boost::asio::error::would_block;
162 server_socket_.async_connect(iter->endpoint(), boost::lambda::var(ec) = boost::lambda::_1);
166 }
while (ec == boost::asio::error::would_block);
169 if (ec || !server_socket_.is_open()) {
171 if (ec.value() == boost::system::errc::operation_canceled) {
172 throw Exception(
"OpenPRS waiting for server to come up timed out");
174 throw Exception(
"OpenPRS waiting for server failed: %s", ec.message().c_str());
181 boost::asio::socket_base::keep_alive keep_alive_option(
true);
182 server_socket_.set_option(keep_alive_option);
185 std::string greeting = OpenPRSServerProxy::read_string_from_socket(server_socket_);
188 OpenPRSServerProxy::write_string_to_socket(server_socket_,
"fawkes");
189 OpenPRSServerProxy::write_int_to_socket(server_socket_, getpid());
190 OpenPRSServerProxy::write_int_to_socket(server_socket_, 0);
192 io_service_thread_ = std::thread([
this]() { this->io_service_.run(); });
196 openprs_server_proxy_ =
199 if (cfg_mp_use_proxy_) {
204 openprs_mp_proxy_ = NULL;
208 openprs_kernel_mgr_ =
210 cfg_server_proxy_port_,
211 cfg_mp_use_proxy_ ? hostname : cfg_mp_host_,
212 cfg_mp_use_proxy_ ? cfg_mp_proxy_port_ : cfg_mp_port_,
216 openprs_aspect_inifin_.
prepare(
"localhost",
217 fawkes::runtime::network_manager->fawkes_port(),
219 openprs_server_proxy_,
221 openprs_manager_aspect_inifin_.
set_manager(openprs_kernel_mgr_);
227 server_socket_.close();
229 if (io_service_thread_.joinable()) {
230 io_service_thread_.join();
235 proc_srv_->
kill(SIGINT);
239 proc_mp_->
kill(SIGINT);
245 delete openprs_server_proxy_;
246 delete openprs_mp_proxy_;
247 openprs_kernel_mgr_.
clear();
259const std::list<AspectIniFin *>
260OpenPRSThread::inifin_list()
262 std::list<AspectIniFin *> rv;
263 rv.push_back(&openprs_aspect_inifin_);
264 rv.push_back(&openprs_manager_aspect_inifin_);
269OpenPRSThread::server_alive()
271 if (server_socket_.is_open()) {
272 boost::system::error_code ec;
273 server_socket_.remote_endpoint(ec);
281OpenPRSThread::check_deadline(boost::asio::deadline_timer & deadline,
282 boost::asio::ip::tcp::socket &socket)
284 if (deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
286 deadline.expires_at(boost::posix_time::pos_infin);
289#if BOOST_VERSION >= 104800
290 deadline.async_wait(boost::lambda::bind(
291 &OpenPRSThread::check_deadline,
this, boost::ref(deadline), boost::ref(socket)));
294 boost::bind(&OpenPRSThread::check_deadline,
this, boost::ref(deadline), boost::ref(socket)));
virtual void loop()
Code to execute in the thread.
OpenPRSThread()
Constructor.
virtual ~OpenPRSThread()
Destructor.
virtual void init()
Initialize the thread.
virtual void finalize()
Finalize the thread.
Thread aspect provide a new aspect.
Thread aspect to use blocked timing.
Clock * clock
By means of this member access to the clock is given.
Configuration * config
This is the Configuration member used to access the configuration.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
void clear()
Set underlying instance to 0, decrementing reference count of existing instance appropriately.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
void prepare(const std::string &fawkes_host, unsigned short fawkes_port, LockPtr< OpenPRSKernelManager > &openprs_kernel_mgr, OpenPRSServerProxy *openprs_server_proxy, OpenPRSMessagePasserProxy *openprs_mp_proxy)
Prepare OpenPRS aspect initializer.
void set_kernel_timeout(float timeout_sec)
Set timeout for kernel creation.
void set_manager(LockPtr< OpenPRSKernelManager > &clips_kernel_mgr)
Set OpenPRS environment manger.
Proxy for the OpenPRS server communication.
Sub-process execution with stdin/stdout/stderr redirection.
void check_proc()
Check if the process is still alive.
void kill(int signum)
Send a signal to the process.
Thread class encapsulation of pthreads.
const char * name() const
Get name of thread.
Fawkes library namespace.