Fawkes API Fawkes Development Version
openprs_mp_proxy.cpp
1
2/***************************************************************************
3 * openprs_mp_proxy.h - OpenPRS message passer proxy
4 *
5 * Created: Tue Aug 19 16:59:27 2014
6 * Copyright 2014 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version. A runtime exception applies to
13 * this software (see LICENSE.GPL_WRE file mentioned below for details).
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21 */
22
23#include "openprs_mp_proxy.h"
24
25#include <core/exception.h>
26#include <logging/logger.h>
27
28#include <boost/bind/bind.hpp>
29#include <boost/lexical_cast.hpp>
30
31using namespace boost::asio;
32
33// Types copied from OPRS because they are not public there
34namespace OPRS {
35typedef enum { MESSAGE_MT = 1, BROADCAST_MT, MULTICAST_MT, DISCONNECT_MT } Message_Type;
36typedef enum { REGISTER_OK, REGISTER_NAME_CONFLICT, REGISTER_DENIED } Register_Type;
37typedef enum { MESSAGES_PT, STRINGS_PT } Protocol_Type;
38} // namespace OPRS
39
40namespace fawkes {
41
42/** @class OpenPRSMessagePasserProxy "openprs_mp_proxy.h"
43 * Proxy for the OpenPRS server communication.
44 * Using this proxy allows to inject commands into the communication between
45 * oprs-server and oprs (or xoprs).
46 * @author Tim Niemueller
47 */
48
49/** Constructor.
50 * @param tcp_port port to listen on for incoming connections
51 * @param mp_host host of mp-oprs to connect to
52 * @param mp_port TCP port that mp-oprs listens on
53 * @param logger logger for informational messages
54 */
56 const std::string &mp_host,
57 unsigned short mp_port,
58 fawkes::Logger * logger)
59: io_service_work_(io_service_),
60 acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), tcp_port)),
61 mp_host_(mp_host),
62 mp_port_(mp_port),
63 logger_(logger)
64{
65 acceptor_.set_option(socket_base::reuse_address(true));
66 io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
67 start_accept();
68}
69
70/** Destructor. */
72{
73 io_service_.stop();
74 io_service_thread_.join();
75}
76
77/** Start accepting connections. */
78void
79OpenPRSMessagePasserProxy::start_accept()
80{
81 Mapping::Ptr mapping(new Mapping(io_service_, mp_host_, mp_port_, logger_));
82 acceptor_.async_accept(mapping->client_socket,
83 boost::bind(&OpenPRSMessagePasserProxy::handle_accept,
84 this,
85 mapping,
86 boost::asio::placeholders::error));
87}
88
89void
90OpenPRSMessagePasserProxy::handle_accept(Mapping::Ptr mapping,
91 const boost::system::error_code &error)
92{
93 if (!error) {
94 mappings_.push_back(mapping);
95 mapping->start();
96 }
97
98 start_accept();
99}
100
101OpenPRSMessagePasserProxy::Mapping::Mapping(boost::asio::io_service &io_service,
102 const std::string & mp_host,
103 unsigned short mp_port,
104 fawkes::Logger * logger)
105: io_service_(io_service),
106 resolver_(io_service_),
107 server_host_(mp_host),
108 server_port_(mp_port),
109 logger_(logger),
110 server_in_reg_reply_(0),
111 server_in_str_len_(0),
112 client_in_msg_type_(0),
113 client_socket(io_service_),
114 server_socket(io_service_)
115{
116}
117
118/** Destruct mapping.
119 * This closes both, client and server sockets. This destructor
120 * assumes that the io_service has been cancelled.
121 */
122OpenPRSMessagePasserProxy::Mapping::~Mapping()
123{
124 boost::system::error_code err;
125 client_socket.shutdown(ip::tcp::socket::shutdown_both, err);
126 client_socket.close();
127 server_socket.shutdown(ip::tcp::socket::shutdown_both, err);
128 server_socket.close();
129}
130
131/** A client has connected, start this mapping. */
132void
133OpenPRSMessagePasserProxy::Mapping::start()
134{
135 client_prot = read_int_from_socket(client_socket);
136 client_name = read_string_from_socket(client_socket);
137
138 logger_->log_info("OPRS-mp-proxy", "Client %s connected", client_name.c_str());
139
140 ip::tcp::resolver::query query(server_host_, boost::lexical_cast<std::string>(server_port_));
141 resolver_.async_resolve(query,
142 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_resolve,
143 this,
144 boost::asio::placeholders::error,
145 boost::asio::placeholders::iterator));
146}
147
148bool
149OpenPRSMessagePasserProxy::Mapping::alive() const
150{
151 return client_socket.is_open();
152}
153
154/** Disconnect this client. */
155void
156OpenPRSMessagePasserProxy::Mapping::disconnect()
157{
158 disconnect("disconnect", "API call");
159}
160
161void
162OpenPRSMessagePasserProxy::Mapping::disconnect(const char *where, const char *reason)
163{
164 logger_->log_warn(
165 "OPRS-mp-proxy", "Client %s disconnected (%s: %s)", client_name.c_str(), where, reason);
166 boost::system::error_code ec;
167 client_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
168 client_socket.close();
169}
170
171void
172OpenPRSMessagePasserProxy::Mapping::handle_resolve(const boost::system::error_code &err,
173 ip::tcp::resolver::iterator endpoint_iterator)
174{
175 if (!err) {
176 // Attempt a connection to each endpoint in the list until we
177 // successfully establish a connection.
178#if BOOST_ASIO_VERSION > 100409
179 boost::asio::async_connect(server_socket,
180 endpoint_iterator,
181#else
182 server_socket.async_connect(*endpoint_iterator,
183#endif
184 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_connect,
185 this,
186 boost::asio::placeholders::error));
187 } else {
188 disconnect("handle_resolve", err.message().c_str());
189 }
190}
191
192void
193OpenPRSMessagePasserProxy::Mapping::handle_connect(const boost::system::error_code &err)
194{
195 if (!err) {
196 write_int_to_socket(server_socket, client_prot);
197 write_string_to_socket(server_socket, client_name);
198
199 // asynchronously read registration reply
200 boost::asio::async_read(
201 server_socket,
202 boost::asio::buffer(&server_in_reg_reply_, sizeof(server_in_reg_reply_)),
203 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply,
204 this,
205 boost::asio::placeholders::error));
206
207 } else {
208 disconnect("handle_connect", err.message().c_str());
209 }
210}
211
212void
213OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply(
214 const boost::system::error_code &err)
215{
216 write_int_to_socket(client_socket, server_in_reg_reply_);
217
218 if (server_in_reg_reply_ == OPRS::REGISTER_OK) {
219 start_recv_client();
220 start_recv_server();
221 } else {
222 disconnect("recv_server_reg_reply", err.message().c_str());
223 }
224}
225
226void
227OpenPRSMessagePasserProxy::Mapping::start_recv_client()
228{
229 boost::asio::async_read(client_socket,
230 boost::asio::buffer(&client_in_msg_type_, sizeof(client_in_msg_type_)),
231 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_client,
232 this,
233 boost::asio::placeholders::error));
234}
235
236void
237OpenPRSMessagePasserProxy::Mapping::handle_recv_client(const boost::system::error_code &err)
238{
239 if (!err) {
240 try {
241 std::vector<std::string> multicast_recipients;
242 std::string message;
243 std::string recipient;
244
245 client_in_msg_type_ = ntohl(client_in_msg_type_);
246
247 switch (client_in_msg_type_) {
248 case OPRS::DISCONNECT_MT:
249 logger_->log_info("OPRS-mp-proxy", "Disconnecting %s", client_name.c_str());
250 disconnect("recv_client", "Client disconnected");
251 return;
252
253 case OPRS::MESSAGE_MT: recipient = read_string_from_socket(client_socket); break;
254
255 case OPRS::MULTICAST_MT:
256 multicast_recipients.resize(read_int_from_socket(client_socket));
257 break;
258
259 case OPRS::BROADCAST_MT: break; // nothing to do here
260
261 default: disconnect("recv_client", "Unknown message type"); return;
262 }
263
264 message = read_string_from_socket(client_socket);
265
266 if (client_in_msg_type_ == OPRS::MULTICAST_MT) {
267 for (size_t i = 0; i < multicast_recipients.size(); ++i) {
268 multicast_recipients[i] = read_string_from_socket(client_socket);
269 }
270 }
271
272 // debug output
273 switch (client_in_msg_type_) {
274 case OPRS::MESSAGE_MT:
275 logger_->log_info("OPRS-mp-proxy",
276 "Forwarding unicast %s->%s: '%s'",
277 client_name.c_str(),
278 recipient.c_str(),
279 message.c_str());
280 break;
281
282 case OPRS::MULTICAST_MT: {
283 std::string recipients;
284 for (size_t i = 0; i < multicast_recipients.size(); ++i) {
285 if (i > 0)
286 recipients += ", ";
287 recipients += multicast_recipients[i];
288 }
289
290 logger_->log_info("OPRS-mp-proxy",
291 "Forwarding multicast %s->(%s): '%s'",
292 client_name.c_str(),
293 recipients.c_str(),
294 message.c_str());
295 } break;
296
297 case OPRS::BROADCAST_MT:
298 logger_->log_info("OPRS-mp-proxy",
299 "Forwarding broadcast %s->*: '%s'",
300 client_name.c_str(),
301 message.c_str());
302 break;
303
304 default: break;
305 }
306
307 // now re-send message to server
308 write_int_to_socket(server_socket, client_in_msg_type_);
309
310 switch (client_in_msg_type_) {
311 case OPRS::MESSAGE_MT:
312 write_string_to_socket(server_socket, recipient);
313 write_string_to_socket(server_socket, message);
314 break;
315
316 case OPRS::MULTICAST_MT:
317 write_string_to_socket(server_socket, message);
318 for (size_t i = 0; i < multicast_recipients.size(); ++i) {
319 write_string_to_socket(server_socket, multicast_recipients[i]);
320 }
321 break;
322
323 case OPRS::BROADCAST_MT: // nothing to do here
324 write_string_to_socket(server_socket, message);
325 break;
326
327 default: break; // cannot happen here anymore
328 }
329
330 start_recv_client();
331 } catch (Exception &e) {
332 disconnect("recv_client", e.what_no_backtrace());
333 }
334 } else {
335 disconnect("recv_client", err.message().c_str());
336 }
337}
338
339void
340OpenPRSMessagePasserProxy::Mapping::start_recv_server()
341{
342 if (client_prot == OPRS::MESSAGES_PT) {
343 logger_->log_warn("OPRS-mp-proxy",
344 "Starting listening for %s in MESSAGES_PT mode",
345 client_name.c_str());
346 boost::asio::async_read_until(
347 server_socket,
348 server_buffer_,
349 '\n',
350 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt,
351 this,
352 boost::asio::placeholders::error));
353 } else {
354 // tried async_read_some with null buffers but always immediately fires without data available
355 logger_->log_warn("OPRS-mp-proxy",
356 "Starting listening for %s in STRINGS_PT mode",
357 client_name.c_str());
358 server_socket.async_read_some(
359 boost::asio::null_buffers(),
360 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt,
361 this,
362 boost::asio::placeholders::error));
363 }
364}
365
366void
367OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt(
368 const boost::system::error_code &err)
369{
370 if (!err) {
371 std::string line;
372 std::istream in_stream(&server_buffer_);
373 std::getline(in_stream, line);
374
375 logger_->log_info("OPRS-mp-proxy",
376 "Forwarding server ->%s: '%s\\n'",
377 client_name.c_str(),
378 line.c_str());
379
380 // resend to client
381 write_string_newline_to_socket(client_socket, line);
382
383 start_recv_server();
384 } else {
385 disconnect("recv_server_message_pt", err.message().c_str());
386 }
387}
388
389void
390OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt(
391 const boost::system::error_code &err)
392{
393 if (!err) {
394 try {
395 std::string sender = read_string_from_socket(server_socket);
396 std::string message = read_string_from_socket(server_socket);
397
398 logger_->log_info("OPRS-mp-proxy",
399 "Forwarding server %s->%s: '%s'",
400 sender.c_str(),
401 client_name.c_str(),
402 message.c_str());
403
404 // resend to client
405 write_string_to_socket(client_socket, sender);
406 write_string_to_socket(client_socket, message);
407
408 start_recv_server();
409 } catch (Exception &e) {
410 disconnect("recv_server_strings_pt", e.what_no_backtrace());
411 }
412 } else {
413 disconnect("recv_server_strings_pt", err.message().c_str());
414 }
415}
416
417int
418OpenPRSMessagePasserProxy::Mapping::read_int_from_socket(boost::asio::ip::tcp::socket &socket)
419{
420 int32_t value;
421 boost::system::error_code ec;
422 boost::asio::read(socket, boost::asio::buffer(&value, sizeof(value)), ec);
423 if (ec) {
424 throw Exception("Failed to read int from socket: %s", ec.message().c_str());
425 } else {
426 return ntohl(value);
427 }
428}
429
430std::string
431OpenPRSMessagePasserProxy::Mapping::read_string_from_socket(boost::asio::ip::tcp::socket &socket)
432{
433 uint32_t s_size = 0;
434 boost::system::error_code ec;
435 boost::asio::read(socket, boost::asio::buffer(&s_size, sizeof(s_size)), ec);
436 if (ec) {
437 throw Exception("Failed to read string size from socket: %s", ec.message().c_str());
438 }
439 s_size = ntohl(s_size);
440
441 char s[s_size + 1];
442 boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
443 if (ec) {
444 throw Exception("Failed to read string content from socket: %s", ec.message().c_str());
445 }
446 s[s_size] = 0;
447
448 return s;
449}
450
451void
452OpenPRSMessagePasserProxy::Mapping::write_int_to_socket(boost::asio::ip::tcp::socket &socket, int i)
453{
454 boost::system::error_code ec;
455 int32_t value = htonl(i);
456 boost::asio::write(socket, boost::asio::buffer(&value, sizeof(value)), ec);
457 if (ec) {
458 throw Exception("Failed to write int to socket: %s", ec.message().c_str());
459 }
460}
461
462void
463OpenPRSMessagePasserProxy::Mapping::write_string_to_socket(boost::asio::ip::tcp::socket &socket,
464 std::string & str)
465{
466 boost::system::error_code ec;
467 uint32_t s_size = htonl(str.size());
468 std::array<boost::asio::const_buffer, 2> buffers;
469 buffers[0] = boost::asio::buffer(&s_size, sizeof(s_size));
470 buffers[1] = boost::asio::buffer(str.c_str(), str.size());
471
472 boost::asio::write(socket, buffers, ec);
473 if (ec) {
474 throw Exception("Failed to write string to socket: %s", ec.message().c_str());
475 }
476}
477
478void
479OpenPRSMessagePasserProxy::Mapping::write_string_newline_to_socket(
480 boost::asio::ip::tcp::socket &socket,
481 const std::string & str)
482{
483 boost::system::error_code ec;
484 std::string s = str + "\n";
485 boost::asio::write(socket, boost::asio::buffer(s.c_str(), s.size()), ec);
486 if (ec) {
487 throw Exception("Failed to write string to socket: %s", ec.message().c_str());
488 }
489}
490
491} // end namespace fawkes
Interface for logging.
Definition: logger.h:42
OpenPRSMessagePasserProxy(unsigned short tcp_port, const std::string &mp_host, unsigned short mp_port, fawkes::Logger *logger)
Constructor.
virtual ~OpenPRSMessagePasserProxy()
Destructor.
Fawkes library namespace.