Fawkes API Fawkes Development Version
gazsim_comm_thread.cpp
1/***************************************************************************
2 * gazsim_comm_plugin.cpp - Plugin simulates peer-to-peer communication over
3 * an network with configurable instability and manages
4 * the frowarding of messages to different ports on
5 * the same machine.
6 *
7 * Created: Thu Sep 12 11:09:48 2013
8 * Copyright 2013 Frederik Zwilling
9 *
10 ****************************************************************************/
11
12/* This program is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU Library General Public License for more details.
21 *
22 * Read the full text in the LICENSE.GPL file in the doc directory.
23 */
24
25#include "gazsim_comm_thread.h"
26
27#include <aspect/blocked_timing.h>
28#include <protobuf_comm/message_register.h>
29#include <protobuf_comm/peer.h>
30
31#include <algorithm>
32#include <stdlib.h>
33
34using namespace fawkes;
35using namespace protobuf_comm;
36using namespace boost::placeholders;
37
38/** @class GazsimCommThread "clips_thread.h"
39 * Plugin simulates and manages communication for Simulation in Gazebo
40 * @author Frederik Zwilling
41 */
42
43GazsimCommThread::GazsimCommThread()
44: Thread("GazsimCommThread", Thread::OPMODE_WAITFORWAKEUP),
45 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE)
46{
47}
48
49GazsimCommThread::~GazsimCommThread()
50{
51}
52
53void
55{
56 //logger->log_info(name(), "GazsimComm initializing");
57 initialized_ = false;
58
59 //read config values
60 proto_dirs_ = config->get_strings("/gazsim/proto-dirs");
61 package_loss_ = config->get_float("/gazsim/comm/package-loss");
62 addresses_ = config->get_strings("/gazsim/comm/addresses");
63 send_ports_ = config->get_uints("/gazsim/comm/send-ports");
64 recv_ports_ = config->get_uints("/gazsim/comm/recv-ports");
65 use_crypto1_ = config->get_bool("/gazsim/comm/use-crypto1");
66 use_crypto2_ = config->get_bool("/gazsim/comm/use-crypto1");
67 send_ports_crypto1_ = config->get_uints("/gazsim/comm/send-ports-crypto1");
68 recv_ports_crypto1_ = config->get_uints("/gazsim/comm/recv-ports-crypto1");
69 send_ports_crypto2_ = config->get_uints("/gazsim/comm/send-ports-crypto2");
70 recv_ports_crypto2_ = config->get_uints("/gazsim/comm/recv-ports-crypto2");
71 if (addresses_.size() != send_ports_.size() || addresses_.size() != recv_ports_.size()
72 || (use_crypto1_ && addresses_.size() != send_ports_crypto1_.size())
73 || (use_crypto1_ && addresses_.size() != recv_ports_crypto1_.size())
74 || (use_crypto2_ && addresses_.size() != send_ports_crypto2_.size())
75 || (use_crypto2_ && addresses_.size() != recv_ports_crypto2_.size())) {
76 logger->log_warn(name(), "/gazsim/comm/ has an invalid configuration!");
77 }
78
79 //resolve proto paths
80 try {
81 proto_dirs_ = config->get_strings("/clips-protobuf/proto-dirs");
82 for (size_t i = 0; i < proto_dirs_.size(); ++i) {
83 std::string::size_type pos;
84 if ((pos = proto_dirs_[i].find("@BASEDIR@")) != std::string::npos) {
85 proto_dirs_[i].replace(pos, 9, BASEDIR);
86 }
87 if ((pos = proto_dirs_[i].find("@FAWKES_BASEDIR@")) != std::string::npos) {
88 proto_dirs_[i].replace(pos, 16, FAWKES_BASEDIR);
89 }
90 if ((pos = proto_dirs_[i].find("@RESDIR@")) != std::string::npos) {
91 proto_dirs_[i].replace(pos, 8, RESDIR);
92 }
93 if ((pos = proto_dirs_[i].find("@CONFDIR@")) != std::string::npos) {
94 proto_dirs_[i].replace(pos, 9, CONFDIR);
95 }
96 if (proto_dirs_[i][proto_dirs_.size() - 1] != '/') {
97 proto_dirs_[i] += "/";
98 }
99 }
100 } catch (Exception &e) {
101 logger->log_warn(name(), "Failed to load proto paths from config, exception follows");
102 logger->log_warn(name(), e);
103 }
104
105 //create peer connections
106 peers_.resize(addresses_.size());
107 peers_crypto1_.resize(addresses_.size());
108 peers_crypto2_.resize(addresses_.size());
109 for (unsigned int i = 0; i < addresses_.size(); i++) {
110 peers_[i] =
111 new ProtobufBroadcastPeer(addresses_[i], send_ports_[i], recv_ports_[i], proto_dirs_);
112 peers_[i]->signal_received_raw().connect(
113 boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
114 peers_[i]->signal_send_error().connect(
115 boost::bind(&GazsimCommThread::peer_send_error, this, addresses_[i], send_ports_[i], _1));
116 if (use_crypto1_) {
117 peers_crypto1_[i] = new ProtobufBroadcastPeer(addresses_[i],
118 send_ports_crypto1_[i],
119 recv_ports_crypto1_[i],
120 proto_dirs_);
121 peers_crypto1_[i]->signal_received_raw().connect(
122 boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
123 peers_crypto1_[i]->signal_send_error().connect(boost::bind(
124 &GazsimCommThread::peer_send_error, this, addresses_[i], send_ports_crypto1_[i], _1));
125 }
126 if (use_crypto2_) {
127 peers_crypto2_[i] = new ProtobufBroadcastPeer(addresses_[i],
128 send_ports_crypto2_[i],
129 recv_ports_crypto2_[i],
130 proto_dirs_);
131 peers_crypto2_[i]->signal_received_raw().connect(
132 boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
133 peers_crypto2_[i]->signal_send_error().connect(boost::bind(
134 &GazsimCommThread::peer_send_error, this, addresses_[i], send_ports_crypto2_[i], _1));
135 }
136 }
137 initialized_ = true;
138}
139
140void
142{
143 for (unsigned int i = 0; i < peers_.size(); i++) {
144 delete peers_[i];
145 }
146}
147
148void
150{
151}
152
153/**
154 * Receive and forward raw msg
155 * @param endpoint port msg received from
156 * @param header header of the msg
157 * @param data data stream
158 * @param length length of the data stream
159 */
160void
161GazsimCommThread::receive_raw_msg(boost::asio::ip::udp::endpoint &endpoint,
162 protobuf_comm::frame_header_t & header,
163 void * data,
164 size_t length)
165{
166 //logger->log_info(name(), "Got raw Message from port %d", endpoint.port());
167 unsigned int incoming_peer_port = endpoint.port(); //this is suprisingly the send-port
168
169 if (!initialized_) {
170 return;
171 }
172
173 //simulate package loss
174 double rnd = ((double)rand()) / ((double)RAND_MAX); //0.0 <= rnd <= 1.0
175 if (rnd < package_loss_) {
176 return;
177 }
178
179 //check which set of peers the message comes from
180 std::vector<protobuf_comm::ProtobufBroadcastPeer *> peers;
181 std::vector<unsigned int> send_ports;
182 if (std::find(send_ports_.begin(), send_ports_.end(), incoming_peer_port) != send_ports_.end()) {
183 peers = peers_;
184 send_ports = send_ports_;
185 } else if (use_crypto1_
186 && std::find(send_ports_crypto1_.begin(),
187 send_ports_crypto1_.end(),
188 incoming_peer_port)
189 != send_ports_crypto1_.end()) {
190 peers = peers_crypto1_;
191 send_ports = send_ports_crypto1_;
192 } else if (use_crypto2_
193 && std::find(send_ports_crypto2_.begin(),
194 send_ports_crypto2_.end(),
195 incoming_peer_port)
196 != send_ports_crypto2_.end()) {
197 peers = peers_crypto2_;
198 send_ports = send_ports_crypto2_;
199 }
200
201 //send message to all other peers
202 for (unsigned int i = 0; i < peers.size(); i++) {
203 if (send_ports[i] != incoming_peer_port) {
204 peers[i]->send_raw(header, data, length);
205 }
206 }
207}
208
209void
210GazsimCommThread::peer_send_error(std::string address, unsigned int port, std::string err)
211{
212 logger->log_warn(name(), "Peer send error for %s:%u: %s", address.c_str(), port, err.c_str());
213}
virtual void init()
Initialize the thread.
virtual void finalize()
Finalize the thread.
virtual void loop()
Code to execute in the thread.
Thread aspect to use blocked timing.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual std::vector< unsigned int > get_uints(const char *path)=0
Get list of values from configuration which is of type unsigned int.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
Fawkes library namespace.