Fawkes API Fawkes Development Version
protobuf_thread.cpp
1
2/***************************************************************************
3 * Protoboard plugin template
4 * - Implementation of the ProtoBuf thread: protobuf_comm to actually send
5 * messages.
6 *
7 * Copyright 2019 Victor Mataré
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL file in the doc directory.
21 */
22
23#include "protobuf_thread.h"
24
25#include "blackboard_manager.h"
26
27#include <core/threading/mutex_locker.h>
28#include <google/protobuf/descriptor.h>
29#include <protobuf_comm/client.h>
30#include <protobuf_comm/peer.h>
31
32using namespace google::protobuf;
33using namespace protobuf_comm;
34using namespace boost::placeholders;
35
36namespace protoboard {
37
39: Thread("ProtoboardMessageHandler", Thread::OPMODE_CONTINUOUS),
40 message_register_(nullptr),
41 next_client_id_(0),
42 bb_manager_(nullptr)
43{
44}
45
47{
48 delete message_register_;
49}
50
51void
53{
54 message_register_ = new MessageRegister(proto_dirs());
55
56 if (!bb_manager_)
58 "BUG: %s's reference to blackboard manager thread hasn't been initialized", name());
59}
60
61bool
63{
64 fawkes::MutexLocker lock(&msgq_mutex_);
65 return !pb_queue_.empty();
66}
67
70{
71 fawkes::MutexLocker lock(&msgq_mutex_);
72 incoming_message msg = pb_queue_.front();
73 pb_queue_.pop();
74 return msg;
75}
76
77/** Enable protobuf peer.
78 * @param address IP address to send messages to
79 * @param send_to_port UDP port to send messages to
80 * @param recv_on_port UDP port to receive messages on, 0 to use the same as the @p send_port
81 * @param crypto_key encryption key
82 * @param cipher cipher suite, see BufferEncryptor for supported types
83 * @return peer identifier
84 */
85long int
86ProtobufThead::peer_create_local_crypto(const std::string &address,
87 int send_to_port,
88 int recv_on_port,
89 const std::string &crypto_key,
90 const std::string &cipher)
91{
92 if (recv_on_port <= 0)
93 recv_on_port = send_to_port;
94
95 if (send_to_port > 0) {
96 protobuf_comm::ProtobufBroadcastPeer *peer = new protobuf_comm::ProtobufBroadcastPeer(
97 address, send_to_port, recv_on_port, message_register_, crypto_key, cipher);
98
99 long int peer_id;
100 {
101 fawkes::MutexLocker lock(&map_mutex_);
102 peer_id = ++next_client_id_;
103 peers_[peer_id] = peer;
104 }
105
106 peer->signal_received().connect(
107 boost::bind(&ProtobufThead::handle_peer_msg, this, peer_id, _1, _2, _3, _4));
108 peer->signal_recv_error().connect(
109 boost::bind(&ProtobufThead::handle_peer_recv_error, this, peer_id, _1, _2));
110 peer->signal_send_error().connect(
111 boost::bind(&ProtobufThead::handle_peer_send_error, this, peer_id, _1));
112
113 return peer_id;
114 } else {
115 return 0;
116 }
117}
118
119/** Enable protobuf peer.
120 * @param address IP address to send messages to
121 * @param port UDP port to send and receive messages
122 * @param crypto_key encryption key
123 * @param cipher cipher suite, see BufferEncryptor for supported types
124 * @return peer identifier
125 */
126long int
127ProtobufThead::peer_create_crypto(const std::string &address,
128 int port,
129 const std::string &crypto_key,
130 const std::string &cipher)
131{
132 return peer_create_local_crypto(address, port, port, crypto_key, cipher);
133}
134
135/** Enable protobuf peer.
136 * @param address IP address to send messages to
137 * @param port UDP port to send and receive messages
138 * @return peer identifier
139 */
140long int
141ProtobufThead::peer_create(const std::string &address, int port)
142{
143 return peer_create_local_crypto(address, port, port);
144}
145
146/** Enable protobuf peer.
147 * @param address IP address to send messages to
148 * @param send_to_port UDP port to send messages to
149 * @param recv_on_port UDP port to receive messages on, 0 to use the same as the @p send_port
150 * @return peer identifier
151 */
152long int
153ProtobufThead::peer_create_local(const std::string &address, int send_to_port, int recv_on_port)
154{
155 return peer_create_local_crypto(address, send_to_port, recv_on_port);
156}
157
158/** Disable peer.
159 * @param peer_id ID of the peer to destroy
160 */
161void
163{
164 if (peers_.find(peer_id) != peers_.end()) {
165 delete peers_[peer_id];
166 peers_.erase(peer_id);
167 }
168}
169
170/** Setup crypto for peer.
171 * @param peer_id ID of the peer to destroy
172 * @param crypto_key encryption key
173 * @param cipher cipher suite, see BufferEncryptor for supported types
174 */
175void
176ProtobufThead::peer_setup_crypto(long int peer_id,
177 const std::string &crypto_key,
178 const std::string &cipher)
179{
180 if (peers_.find(peer_id) != peers_.end()) {
181 peers_[peer_id]->setup_crypto(crypto_key, cipher);
182 }
183}
184
185void
186ProtobufThead::send(long int peer_id, std::shared_ptr<google::protobuf::Message> m)
187{
188 if (!m) {
189 if (logger) {
190 logger->log_warn(name(), "Cannot send broadcast: invalid message");
191 }
192 return;
193 }
194
195 fawkes::MutexLocker lock(&map_mutex_);
196 if (peers_.find(peer_id) == peers_.end())
197 return;
198
199 //logger->log_info(name(), "Broadcasting %s", (*m)->GetTypeName().c_str());
200 try {
201 peers_[peer_id]->send(m);
202 } catch (google::protobuf::FatalException &e) {
203 if (logger) {
205 "Failed to broadcast message of type %s: %s",
206 m->GetTypeName().c_str(),
207 e.what());
208 }
209 } catch (fawkes::Exception &e) {
210 if (logger) {
212 "Failed to broadcast message of type %s: %s",
213 m->GetTypeName().c_str(),
214 e.what_no_backtrace());
215 }
216 } catch (std::runtime_error &e) {
217 if (logger) {
219 "Failed to broadcast message of type %s: %s",
220 m->GetTypeName().c_str(),
221 e.what());
222 }
223 }
224}
225
226/** Handle message that came from a peer/robot
227 * @param endpoint the endpoint from which the message was received
228 * @param component_id component the message was addressed to
229 * @param msg_type type of the message
230 * @param msg the message
231 */
232void
233ProtobufThead::handle_peer_msg(long int peer_id,
234 boost::asio::ip::udp::endpoint &endpoint,
235 uint16_t component_id,
236 uint16_t msg_type,
237 std::shared_ptr<Message> msg)
238{
239 fawkes::MutexLocker lock(&msgq_mutex_);
240 pb_queue_.push({peer_id, endpoint, component_id, msg_type, std::move(msg)});
241 bb_manager_->wakeup();
242}
243
244/** Handle error during peer message processing.
245 * @param endpoint endpoint of incoming message
246 * @param msg error message
247 */
248void
249ProtobufThead::handle_peer_recv_error(long int peer_id,
250 boost::asio::ip::udp::endpoint &endpoint,
251 std::string msg)
252{
253 if (logger) {
255 "Failed to receive peer message from %s:%u: %s",
256 endpoint.address().to_string().c_str(),
257 endpoint.port(),
258 msg.c_str());
259 }
260}
261
262/** Handle error during peer message processing.
263 * @param msg error message
264 */
265void
266ProtobufThead::handle_peer_send_error(long int peer_id, std::string msg)
267{
268 if (logger) {
269 logger->log_warn(name(), "Failed to send peer message: %s", msg.c_str());
270 }
271}
272
273} // namespace protoboard
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Mutex locking helper.
Definition: mutex_locker.h:34
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
void wakeup()
Wake up thread.
Definition: thread.cpp:995
incoming_message pb_queue_pop()
void peer_destroy(long int peer_id)
Disable peer.
long int peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
virtual void init() override
Initialize the thread.
long int peer_create(const std::string &host, int port)
Enable protobuf peer.
long int peer_create_local(const std::string &host, int send_to_port, int recv_on_port)
Enable protobuf peer.
ProtobufThead()
Empty-initialization constructor.
long int peer_create_local_crypto(const std::string &host, int send_to_port, int recv_on_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
void send(long int peer_id, std::shared_ptr< google::protobuf::Message > msg)
Send a ProtoBuf message to the given peer.
virtual ~ProtobufThead() override
Destructor.
Wrapper for a ProtoBuf message and its metadata.