Fawkes API Fawkes Development Version
openprs_comm.cpp
1
2/***************************************************************************
3 * openprs_comm.cpp - OpenPRS communication wrapper for Fawkes
4 *
5 * Created: Mon Aug 18 14:55:51 2014
6 * Copyright 2014 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version. A runtime exception applies to
13 * this software (see LICENSE.GPL_WRE file mentioned below for details).
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21 */
22
23#include "openprs_comm.h"
24
25#include "openprs_server_proxy.h"
26
27#include <core/exception.h>
28#include <core/exceptions/system.h>
29#include <utils/sub_process/proc.h>
30
31// clang-format off
32// The OpenPRS headers have an include problem and require this ordering.
33#include <opaque-pub.h>
34#include <mp-pub.h>
35// clang-format on
36#include <unistd.h>
37
38// these exist in libExtMP and are exported, but not mentioned in the header
39extern "C" {
40void send_message_string_socket(int socket, Symbol rec, PString message);
41void broadcast_message_string_socket(int socket, PString message);
42void
43multicast_message_string_socket(int socket, unsigned int nb_recs, Symbol *recs, PString message);
44}
45
46namespace fawkes {
47
48/** @class OpenPRSComm <plugins/openprs/utils/openprs_comm.h>
49 * OpenPRS communication wrapper.
50 * This class provides communication facilities via the OpenPRS message passer
51 * as well as through the OpenPRS server proxy.
52 * @author Tim Niemueller
53 */
54
55/** Constructor.
56 * @param local_name the local name with which the communication wrapper will be
57 * registered to the message parser. This can be used to send messages from the
58 * kernel.
59 * @param hostname host where the message passer runs
60 * @param port TCP port where the message passer listens
61 * @param server_proxy server proxy to use to send commands to kernels
62 * @param logger logger for informational messages (optional)
63 */
64OpenPRSComm::OpenPRSComm(const char * local_name,
65 const char * hostname,
66 unsigned short port,
67 OpenPRSServerProxy *server_proxy,
68 Logger * logger)
69: name_(local_name),
70 server_proxy_(server_proxy),
71 logger_(logger),
72 io_service_work_(io_service_),
73 sd_mp_socket_(io_service_)
74
75{
76 // Protocol MUST be STRINGS_PT or otherwise our message reception code will break
77 mp_socket_ = external_register_to_the_mp_host_prot(local_name, hostname, port, STRINGS_PT);
78 if (mp_socket_ == -1) {
79 throw Exception("Failed to connect to OpenPRS as '%s'", local_name);
80 }
81 io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
82 sd_mp_socket_.assign(dup(mp_socket_));
83 start_recv();
84}
85
86/** Destructor. */
88{
89 io_service_.stop();
90 io_service_thread_.join();
91 if (mp_socket_ >= 0) {
92 ::close(mp_socket_);
93 }
94}
95
96/** Send a message to an OpenPRS kernel.
97 * @param recipient OpenPRS kernel name to send to
98 * @param message message to send, cf. OpenPRS manual for valid messages
99 */
100void
101OpenPRSComm::send_message(const char *recipient, const char *message)
102{
103 send_message_string_socket(mp_socket_, recipient, (char *)message);
104}
105
106/** Send a message to all OpenPRS kernels.
107 * @param message message to send, cf. OpenPRS manual for valid messages
108 */
109void
111{
112 broadcast_message_string_socket(mp_socket_, (char *)message);
113}
114
115/** Send a message to multiple OpenPRS kernel.
116 * @param recipients Vector of OpenPRS kernel names to send to
117 * @param message message to send, cf. OpenPRS manual for valid messages
118 */
119void
120OpenPRSComm::multicast_message(std::vector<const char *> &recipients, const char *message)
121{
122 multicast_message_string_socket(mp_socket_, recipients.size(), &(recipients[0]), (char *)message);
123}
124
125/** Send a message to an OpenPRS kernel.
126 * @param recipient OpenPRS kernel name to send to
127 * @param message message to send, cf. OpenPRS manual for valid messages
128 */
129void
130OpenPRSComm::send_message(const std::string &recipient, const std::string &message)
131{
132 send_message_string_socket(mp_socket_, recipient.c_str(), (char *)message.c_str());
133}
134
135/** Send a message to all OpenPRS kernels.
136 * @param message message to send, cf. OpenPRS manual for valid messages
137 */
138void
139OpenPRSComm::broadcast_message(const std::string &message)
140{
141 broadcast_message_string_socket(mp_socket_, (char *)message.c_str());
142}
143
144/** Send a message to multiple OpenPRS kernel.
145 * @param recipients Vector of OpenPRS kernel names to send to
146 * @param message message to send, cf. OpenPRS manual for valid messages
147 */
148void
149OpenPRSComm::multicast_message(const std::vector<std::string> &recipients,
150 const std::string & message)
151{
152 std::vector<const char *> recs;
153 recs.resize(recipients.size());
154 for (size_t i = 0; i < recipients.size(); ++i) {
155 recs[i] = recipients[i].c_str();
156 }
157 multicast_message_string_socket(mp_socket_, recs.size(), &(recs[0]), (char *)message.c_str());
158}
159
160/** Send a formatted message to an OpenPRS kernel.
161 * @param recipient OpenPRS kernel name to send to
162 * @param format format for message to send according to sprintf()
163 */
164void
165OpenPRSComm::send_message_f(const std::string &recipient, const char *format, ...)
166{
167 va_list arg;
168 va_start(arg, format);
169 char *msg;
170 if (vasprintf(&msg, format, arg) == -1) {
171 va_end(arg);
172 throw OutOfMemoryException("Cannot format OpenPRS client command string");
173 }
174 va_end(arg);
175 send_message_string_socket(mp_socket_, recipient.c_str(), msg);
176 free(msg);
177}
178
179/** Send a formatted message to all OpenPRS kernels.
180 * @param format format for message to send according to sprintf()
181 */
182void
183OpenPRSComm::broadcast_message_f(const char *format, ...)
184{
185 va_list arg;
186 va_start(arg, format);
187 char *msg;
188 if (vasprintf(&msg, format, arg) == -1) {
189 va_end(arg);
190 throw OutOfMemoryException("Cannot format OpenPRS client command string");
191 }
192 va_end(arg);
193 broadcast_message_string_socket(mp_socket_, msg);
194 free(msg);
195}
196
197/** Send a message to multiple OpenPRS kernel.
198 * @param recipients Vector of OpenPRS kernel names to send to
199 * @param format format for message to send according to sprintf()
200 */
201void
202OpenPRSComm::multicast_message_f(const std::vector<std::string> &recipients,
203 const char * format,
204 ...)
205{
206 std::vector<const char *> recs;
207 recs.resize(recipients.size());
208 for (size_t i = 0; i < recipients.size(); ++i) {
209 recs[i] = recipients[i].c_str();
210 }
211 va_list arg;
212 va_start(arg, format);
213 char *msg;
214 if (vasprintf(&msg, format, arg) == -1) {
215 va_end(arg);
216 throw OutOfMemoryException("Cannot format OpenPRS client command string");
217 }
218 va_end(arg);
219 multicast_message_string_socket(mp_socket_, recs.size(), &(recs[0]), msg);
220 free(msg);
221}
222
223/** Transmit a command to an OpenPRS kernel.
224 * This works equivalent to the transmit oprs-server console command.
225 * @param recipient OpenPRS kernel name to send to
226 * @param message command to send, cf. OpenPRS manual for valid commands
227 */
228void
229OpenPRSComm::transmit_command(const char *recipient, const char *message)
230{
231 server_proxy_->transmit_command(recipient, message);
232}
233
234/** Transmit a command to an OpenPRS kernel.
235 * This works equivalent to the transmit oprs-server console command.
236 * @param recipient OpenPRS kernel name to send to
237 * @param message command to send, cf. OpenPRS manual for valid commands
238 */
239void
240OpenPRSComm::transmit_command(const std::string &recipient, const std::string &message)
241{
242 server_proxy_->transmit_command(recipient, message);
243}
244
245/** Transmit a command to an OpenPRS kernel.
246 * This works equivalent to the transmit oprs-server console command.
247 * This function allows to pass a format according to the sprintf()
248 * format and its arguments.
249 * @param recipient OpenPRS kernel name to send to
250 * @param format format string for the command, must be followed by the
251 * appropriate number and types of arguments.
252 */
253void
254OpenPRSComm::transmit_command_f(const std::string &recipient, const char *format, ...)
255{
256 va_list arg;
257 va_start(arg, format);
258 server_proxy_->transmit_command_v(recipient, format, arg);
259 va_end(arg);
260}
261
262void
263OpenPRSComm::start_recv()
264{
265 sd_mp_socket_.async_read_some(boost::asio::null_buffers(),
266 boost::bind(&OpenPRSComm::handle_recv,
267 this,
268 boost::asio::placeholders::error));
269}
270
271void
272OpenPRSComm::handle_recv(const boost::system::error_code &err)
273{
274 if (!err) {
275 try {
276 std::string sender = read_string_from_socket(sd_mp_socket_);
277 std::string message = read_string_from_socket(sd_mp_socket_);
278
279 sig_rcvd_(sender, message);
280 } catch (Exception &e) {
281 if (logger_) {
282 logger_->log_warn(name_.c_str(), "Failed to receive message: %s", e.what_no_backtrace());
283 }
284 }
285 } else if (logger_) {
286 logger_->log_warn(name_.c_str(), "Failed to receive message: %s", err.message().c_str());
287 }
288 start_recv();
289}
290
291std::string
292OpenPRSComm::read_string_from_socket(boost::asio::posix::stream_descriptor &socket)
293{
294 uint32_t s_size = 0;
295 boost::system::error_code ec;
296 boost::asio::read(socket, boost::asio::buffer(&s_size, sizeof(s_size)), ec);
297 if (ec) {
298 throw Exception("Failed to read string size from socket: %s", ec.message().c_str());
299 }
300 s_size = ntohl(s_size);
301
302 char s[s_size + 1];
303 boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
304 if (ec) {
305 throw Exception("Failed to read string content from socket: %s", ec.message().c_str());
306 }
307 s[s_size] = 0;
308
309 return s;
310}
311
312} // end namespace fawkes
Base class for exceptions in Fawkes.
Definition: exception.h:36
Interface for logging.
Definition: logger.h:42
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void transmit_command(const char *recipient, const char *message)
Transmit a command to an OpenPRS kernel.
void multicast_message(std::vector< const char * > &recipients, const char *message)
Send a message to multiple OpenPRS kernel.
void multicast_message_f(const std::vector< std::string > &recipients, const char *format,...)
Send a message to multiple OpenPRS kernel.
virtual ~OpenPRSComm()
Destructor.
void broadcast_message_f(const char *format,...)
Send a formatted message to all OpenPRS kernels.
void transmit_command_f(const std::string &recipient, const char *format,...)
Transmit a command to an OpenPRS kernel.
void broadcast_message(const char *message)
Send a message to all OpenPRS kernels.
void send_message_f(const std::string &recipient, const char *format,...)
Send a formatted message to an OpenPRS kernel.
OpenPRSComm(const char *local_name, const char *hostname, unsigned short port, OpenPRSServerProxy *server_proxy, Logger *logger=NULL)
Constructor.
void send_message(const char *recipient, const char *message)
Send a message to an OpenPRS kernel.
Proxy for the OpenPRS server communication.
void transmit_command(const std::string &client_name, const std::string &command)
Transmit a command to an OpenPRS kernel.
void transmit_command_v(const std::string &client_name, const char *format, va_list arg)
Transmit a command to an OpenPRS kernel.
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
Fawkes library namespace.