25 #include "gazsim_comm_thread.h" 27 #include <aspect/blocked_timing.h> 28 #include <protobuf_comm/message_register.h> 29 #include <protobuf_comm/peer.h> 35 using namespace protobuf_comm;
42 GazsimCommThread::GazsimCommThread()
43 :
Thread(
"GazsimCommThread",
Thread::OPMODE_WAITFORWAKEUP),
48 GazsimCommThread::~GazsimCommThread()
66 send_ports_crypto1_ =
config->
get_uints(
"/gazsim/comm/send-ports-crypto1");
67 recv_ports_crypto1_ =
config->
get_uints(
"/gazsim/comm/recv-ports-crypto1");
68 send_ports_crypto2_ =
config->
get_uints(
"/gazsim/comm/send-ports-crypto2");
69 recv_ports_crypto2_ =
config->
get_uints(
"/gazsim/comm/recv-ports-crypto2");
70 if (addresses_.size() != send_ports_.size() || addresses_.size() != recv_ports_.size()
71 || (use_crypto1_ && addresses_.size() != send_ports_crypto1_.size())
72 || (use_crypto1_ && addresses_.size() != recv_ports_crypto1_.size())
73 || (use_crypto2_ && addresses_.size() != send_ports_crypto2_.size())
74 || (use_crypto2_ && addresses_.size() != recv_ports_crypto2_.size())) {
81 for (
size_t i = 0; i < proto_dirs_.size(); ++i) {
82 std::string::size_type pos;
83 if ((pos = proto_dirs_[i].find(
"@BASEDIR@")) != std::string::npos) {
84 proto_dirs_[i].replace(pos, 9, BASEDIR);
86 if ((pos = proto_dirs_[i].find(
"@FAWKES_BASEDIR@")) != std::string::npos) {
87 proto_dirs_[i].replace(pos, 16, FAWKES_BASEDIR);
89 if ((pos = proto_dirs_[i].find(
"@RESDIR@")) != std::string::npos) {
90 proto_dirs_[i].replace(pos, 8, RESDIR);
92 if ((pos = proto_dirs_[i].find(
"@CONFDIR@")) != std::string::npos) {
93 proto_dirs_[i].replace(pos, 9, CONFDIR);
95 if (proto_dirs_[i][proto_dirs_.size() - 1] !=
'/') {
96 proto_dirs_[i] +=
"/";
100 logger->
log_warn(
name(),
"Failed to load proto paths from config, exception follows");
105 peers_.resize(addresses_.size());
106 peers_crypto1_.resize(addresses_.size());
107 peers_crypto2_.resize(addresses_.size());
108 for (
unsigned int i = 0; i < addresses_.size(); i++) {
111 peers_[i]->signal_received_raw().connect(
112 boost::bind(&GazsimCommThread::receive_raw_msg,
this, _1, _2, _3, _4));
113 peers_[i]->signal_send_error().connect(
114 boost::bind(&GazsimCommThread::peer_send_error,
this, addresses_[i], send_ports_[i], _1));
117 send_ports_crypto1_[i],
118 recv_ports_crypto1_[i],
120 peers_crypto1_[i]->signal_received_raw().connect(
121 boost::bind(&GazsimCommThread::receive_raw_msg,
this, _1, _2, _3, _4));
122 peers_crypto1_[i]->signal_send_error().connect(boost::bind(
123 &GazsimCommThread::peer_send_error,
this, addresses_[i], send_ports_crypto1_[i], _1));
127 send_ports_crypto2_[i],
128 recv_ports_crypto2_[i],
130 peers_crypto2_[i]->signal_received_raw().connect(
131 boost::bind(&GazsimCommThread::receive_raw_msg,
this, _1, _2, _3, _4));
132 peers_crypto2_[i]->signal_send_error().connect(boost::bind(
133 &GazsimCommThread::peer_send_error,
this, addresses_[i], send_ports_crypto2_[i], _1));
142 for (
unsigned int i = 0; i < peers_.size(); i++) {
160 GazsimCommThread::receive_raw_msg(boost::asio::ip::udp::endpoint &endpoint,
166 unsigned int incoming_peer_port = endpoint.port();
173 double rnd = ((double)rand()) / ((
double)RAND_MAX);
174 if (rnd < package_loss_) {
179 std::vector<protobuf_comm::ProtobufBroadcastPeer *> peers;
180 std::vector<unsigned int> send_ports;
181 if (std::find(send_ports_.begin(), send_ports_.end(), incoming_peer_port) != send_ports_.end()) {
183 send_ports = send_ports_;
184 }
else if (use_crypto1_
185 && std::find(send_ports_crypto1_.begin(),
186 send_ports_crypto1_.end(),
188 != send_ports_crypto1_.end()) {
189 peers = peers_crypto1_;
190 send_ports = send_ports_crypto1_;
191 }
else if (use_crypto2_
192 && std::find(send_ports_crypto2_.begin(),
193 send_ports_crypto2_.end(),
195 != send_ports_crypto2_.end()) {
196 peers = peers_crypto2_;
197 send_ports = send_ports_crypto2_;
201 for (
unsigned int i = 0; i < peers.size(); i++) {
202 if (send_ports[i] != incoming_peer_port) {
203 peers[i]->send_raw(header, data, length);
209 GazsimCommThread::peer_send_error(std::string address,
unsigned int port, std::string err)
211 logger->
log_warn(
name(),
"Peer send error for %s:%u: %s", address.c_str(), port, err.c_str());
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
Thread class encapsulation of pthreads.
Logger * logger
This is the Logger member used to access the logger.
Thread aspect to use blocked timing.
Communicate by broadcasting protobuf messages.
Base class for exceptions in Fawkes.
virtual std::vector< unsigned int > get_uints(const char *path)=0
Get list of values from configuration which is of type unsigned int.
const char * name() const
Get name of thread.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
Configuration * config
This is the Configuration member used to access the configuration.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual void loop()
Code to execute in the thread.