23 #include "openprs_mp_proxy.h" 25 #include <core/exception.h> 26 #include <logging/logger.h> 28 #include <boost/bind.hpp> 29 #include <boost/lexical_cast.hpp> 31 using namespace boost::asio;
35 typedef enum { MESSAGE_MT = 1, BROADCAST_MT, MULTICAST_MT, DISCONNECT_MT } Message_Type;
36 typedef enum { REGISTER_OK, REGISTER_NAME_CONFLICT, REGISTER_DENIED } Register_Type;
37 typedef enum { MESSAGES_PT, STRINGS_PT } Protocol_Type;
55 OpenPRSMessagePasserProxy::OpenPRSMessagePasserProxy(
unsigned short tcp_port,
56 const std::string &mp_host,
57 unsigned short mp_port,
59 : io_service_work_(io_service_),
60 acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), tcp_port)),
65 acceptor_.set_option(socket_base::reuse_address(
true));
66 io_service_thread_ = std::thread([
this]() { this->io_service_.run(); });
74 io_service_thread_.join();
79 OpenPRSMessagePasserProxy::start_accept()
81 Mapping::Ptr mapping(
new Mapping(io_service_, mp_host_, mp_port_, logger_));
82 acceptor_.async_accept(mapping->client_socket,
83 boost::bind(&OpenPRSMessagePasserProxy::handle_accept,
86 boost::asio::placeholders::error));
90 OpenPRSMessagePasserProxy::handle_accept(Mapping::Ptr mapping,
91 const boost::system::error_code &error)
94 mappings_.push_back(mapping);
101 OpenPRSMessagePasserProxy::Mapping::Mapping(boost::asio::io_service &io_service,
102 const std::string & mp_host,
103 unsigned short mp_port,
105 : io_service_(io_service),
106 resolver_(io_service_),
107 server_host_(mp_host),
108 server_port_(mp_port),
110 server_in_reg_reply_(0),
111 server_in_str_len_(0),
112 client_in_msg_type_(0),
113 client_socket(io_service_),
114 server_socket(io_service_)
122 OpenPRSMessagePasserProxy::Mapping::~Mapping()
124 boost::system::error_code err;
125 client_socket.shutdown(ip::tcp::socket::shutdown_both, err);
126 client_socket.close();
127 server_socket.shutdown(ip::tcp::socket::shutdown_both, err);
128 server_socket.close();
133 OpenPRSMessagePasserProxy::Mapping::start()
135 client_prot = read_int_from_socket(client_socket);
136 client_name = read_string_from_socket(client_socket);
138 logger_->log_info(
"OPRS-mp-proxy",
"Client %s connected", client_name.c_str());
140 ip::tcp::resolver::query query(server_host_, boost::lexical_cast<std::string>(server_port_));
141 resolver_.async_resolve(query,
142 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_resolve,
144 boost::asio::placeholders::error,
145 boost::asio::placeholders::iterator));
149 OpenPRSMessagePasserProxy::Mapping::alive()
const 151 return client_socket.is_open();
156 OpenPRSMessagePasserProxy::Mapping::disconnect()
158 disconnect(
"disconnect",
"API call");
162 OpenPRSMessagePasserProxy::Mapping::disconnect(
const char *where,
const char *reason)
165 "OPRS-mp-proxy",
"Client %s disconnected (%s: %s)", client_name.c_str(), where, reason);
166 boost::system::error_code ec;
167 client_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
168 client_socket.close();
172 OpenPRSMessagePasserProxy::Mapping::handle_resolve(
const boost::system::error_code &err,
173 ip::tcp::resolver::iterator endpoint_iterator)
178 #if BOOST_ASIO_VERSION > 100409 179 boost::asio::async_connect(server_socket,
182 server_socket.async_connect(*endpoint_iterator,
184 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_connect,
186 boost::asio::placeholders::error));
188 disconnect(
"handle_resolve", err.message().c_str());
193 OpenPRSMessagePasserProxy::Mapping::handle_connect(
const boost::system::error_code &err)
196 write_int_to_socket(server_socket, client_prot);
197 write_string_to_socket(server_socket, client_name);
200 boost::asio::async_read(
202 boost::asio::buffer(&server_in_reg_reply_,
sizeof(server_in_reg_reply_)),
203 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply,
205 boost::asio::placeholders::error));
208 disconnect(
"handle_connect", err.message().c_str());
213 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply(
214 const boost::system::error_code &err)
216 write_int_to_socket(client_socket, server_in_reg_reply_);
218 if (server_in_reg_reply_ == OPRS::REGISTER_OK) {
222 disconnect(
"recv_server_reg_reply", err.message().c_str());
227 OpenPRSMessagePasserProxy::Mapping::start_recv_client()
229 boost::asio::async_read(client_socket,
230 boost::asio::buffer(&client_in_msg_type_,
sizeof(client_in_msg_type_)),
231 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_client,
233 boost::asio::placeholders::error));
237 OpenPRSMessagePasserProxy::Mapping::handle_recv_client(
const boost::system::error_code &err)
241 std::vector<std::string> multicast_recipients;
243 std::string recipient;
245 client_in_msg_type_ = ntohl(client_in_msg_type_);
247 switch (client_in_msg_type_) {
248 case OPRS::DISCONNECT_MT:
249 logger_->log_info(
"OPRS-mp-proxy",
"Disconnecting %s", client_name.c_str());
250 disconnect(
"recv_client",
"Client disconnected");
253 case OPRS::MESSAGE_MT: recipient = read_string_from_socket(client_socket);
break;
255 case OPRS::MULTICAST_MT:
256 multicast_recipients.resize(read_int_from_socket(client_socket));
259 case OPRS::BROADCAST_MT:
break;
261 default: disconnect(
"recv_client",
"Unknown message type");
return;
264 message = read_string_from_socket(client_socket);
266 if (client_in_msg_type_ == OPRS::MULTICAST_MT) {
267 for (
size_t i = 0; i < multicast_recipients.size(); ++i) {
268 multicast_recipients[i] = read_string_from_socket(client_socket);
273 switch (client_in_msg_type_) {
274 case OPRS::MESSAGE_MT:
275 logger_->log_info(
"OPRS-mp-proxy",
276 "Forwarding unicast %s->%s: '%s'",
282 case OPRS::MULTICAST_MT: {
283 std::string recipients;
284 for (
size_t i = 0; i < multicast_recipients.size(); ++i) {
287 recipients += multicast_recipients[i];
290 logger_->log_info(
"OPRS-mp-proxy",
291 "Forwarding multicast %s->(%s): '%s'",
297 case OPRS::BROADCAST_MT:
298 logger_->log_info(
"OPRS-mp-proxy",
299 "Forwarding broadcast %s->*: '%s'",
308 write_int_to_socket(server_socket, client_in_msg_type_);
310 switch (client_in_msg_type_) {
311 case OPRS::MESSAGE_MT:
312 write_string_to_socket(server_socket, recipient);
313 write_string_to_socket(server_socket, message);
316 case OPRS::MULTICAST_MT:
317 write_string_to_socket(server_socket, message);
318 for (
size_t i = 0; i < multicast_recipients.size(); ++i) {
319 write_string_to_socket(server_socket, multicast_recipients[i]);
323 case OPRS::BROADCAST_MT:
324 write_string_to_socket(server_socket, message);
331 }
catch (Exception &e) {
332 disconnect(
"recv_client", e.what_no_backtrace());
335 disconnect(
"recv_client", err.message().c_str());
340 OpenPRSMessagePasserProxy::Mapping::start_recv_server()
342 if (client_prot == OPRS::MESSAGES_PT) {
343 logger_->log_warn(
"OPRS-mp-proxy",
344 "Starting listening for %s in MESSAGES_PT mode",
345 client_name.c_str());
346 boost::asio::async_read_until(
350 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt,
352 boost::asio::placeholders::error));
355 logger_->log_warn(
"OPRS-mp-proxy",
356 "Starting listening for %s in STRINGS_PT mode",
357 client_name.c_str());
358 server_socket.async_read_some(
359 boost::asio::null_buffers(),
360 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt,
362 boost::asio::placeholders::error));
367 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt(
368 const boost::system::error_code &err)
372 std::istream in_stream(&server_buffer_);
373 std::getline(in_stream, line);
375 logger_->log_info(
"OPRS-mp-proxy",
376 "Forwarding server ->%s: '%s\\n'",
381 write_string_newline_to_socket(client_socket, line);
385 disconnect(
"recv_server_message_pt", err.message().c_str());
390 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt(
391 const boost::system::error_code &err)
395 std::string sender = read_string_from_socket(server_socket);
396 std::string message = read_string_from_socket(server_socket);
398 logger_->log_info(
"OPRS-mp-proxy",
399 "Forwarding server %s->%s: '%s'",
405 write_string_to_socket(client_socket, sender);
406 write_string_to_socket(client_socket, message);
409 }
catch (Exception &e) {
410 disconnect(
"recv_server_strings_pt", e.what_no_backtrace());
413 disconnect(
"recv_server_strings_pt", err.message().c_str());
418 OpenPRSMessagePasserProxy::Mapping::read_int_from_socket(boost::asio::ip::tcp::socket &socket)
421 boost::system::error_code ec;
422 boost::asio::read(socket, boost::asio::buffer(&value,
sizeof(value)), ec);
424 throw Exception(
"Failed to read int from socket: %s", ec.message().c_str());
431 OpenPRSMessagePasserProxy::Mapping::read_string_from_socket(boost::asio::ip::tcp::socket &socket)
434 boost::system::error_code ec;
435 boost::asio::read(socket, boost::asio::buffer(&s_size,
sizeof(s_size)), ec);
437 throw Exception(
"Failed to read string size from socket: %s", ec.message().c_str());
439 s_size = ntohl(s_size);
442 boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
444 throw Exception(
"Failed to read string content from socket: %s", ec.message().c_str());
452 OpenPRSMessagePasserProxy::Mapping::write_int_to_socket(boost::asio::ip::tcp::socket &socket,
int i)
454 boost::system::error_code ec;
455 int32_t value = htonl(i);
456 boost::asio::write(socket, boost::asio::buffer(&value,
sizeof(value)), ec);
458 throw Exception(
"Failed to write int to socket: %s", ec.message().c_str());
463 OpenPRSMessagePasserProxy::Mapping::write_string_to_socket(boost::asio::ip::tcp::socket &socket,
466 boost::system::error_code ec;
467 uint32_t s_size = htonl(str.size());
468 std::array<boost::asio::const_buffer, 2> buffers;
469 buffers[0] = boost::asio::buffer(&s_size,
sizeof(s_size));
470 buffers[1] = boost::asio::buffer(str.c_str(), str.size());
472 boost::asio::write(socket, buffers, ec);
474 throw Exception(
"Failed to write string to socket: %s", ec.message().c_str());
479 OpenPRSMessagePasserProxy::Mapping::write_string_newline_to_socket(
480 boost::asio::ip::tcp::socket &socket,
481 const std::string & str)
483 boost::system::error_code ec;
484 std::string s = str +
"\n";
485 boost::asio::write(socket, boost::asio::buffer(s.c_str(), s.size()), ec);
487 throw Exception(
"Failed to write string to socket: %s", ec.message().c_str());
virtual ~OpenPRSMessagePasserProxy()
Destructor.
Fawkes library namespace.