Fawkes API  Fawkes Development Version
gazsim_comm_thread.cpp
1 /***************************************************************************
2  * gazsim_comm_plugin.cpp - Plugin simulates peer-to-peer communication over
3  * an network with configurable instability and manages
4  * the frowarding of messages to different ports on
5  * the same machine.
6  *
7  * Created: Thu Sep 12 11:09:48 2013
8  * Copyright 2013 Frederik Zwilling
9  *
10  ****************************************************************************/
11 
12 /* This program is free software; you can redistribute it and/or modify
13  * it under the terms of the GNU General Public License as published by
14  * the Free Software Foundation; either version 2 of the License, or
15  * (at your option) any later version.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU Library General Public License for more details.
21  *
22  * Read the full text in the LICENSE.GPL file in the doc directory.
23  */
24 
25 #include "gazsim_comm_thread.h"
26 
27 #include <aspect/blocked_timing.h>
28 #include <protobuf_comm/message_register.h>
29 #include <protobuf_comm/peer.h>
30 
31 #include <algorithm>
32 #include <stdlib.h>
33 
34 using namespace fawkes;
35 using namespace protobuf_comm;
36 
37 /** @class GazsimCommThread "clips_thread.h"
38  * Plugin simulates and manages communication for Simulation in Gazebo
39  * @author Frederik Zwilling
40  */
41 
42 GazsimCommThread::GazsimCommThread()
43 : Thread("GazsimCommThread", Thread::OPMODE_WAITFORWAKEUP),
44  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE)
45 {
46 }
47 
48 GazsimCommThread::~GazsimCommThread()
49 {
50 }
51 
52 void
54 {
55  //logger->log_info(name(), "GazsimComm initializing");
56  initialized_ = false;
57 
58  //read config values
59  proto_dirs_ = config->get_strings("/gazsim/proto-dirs");
60  package_loss_ = config->get_float("/gazsim/comm/package-loss");
61  addresses_ = config->get_strings("/gazsim/comm/addresses");
62  send_ports_ = config->get_uints("/gazsim/comm/send-ports");
63  recv_ports_ = config->get_uints("/gazsim/comm/recv-ports");
64  use_crypto1_ = config->get_bool("/gazsim/comm/use-crypto1");
65  use_crypto2_ = config->get_bool("/gazsim/comm/use-crypto1");
66  send_ports_crypto1_ = config->get_uints("/gazsim/comm/send-ports-crypto1");
67  recv_ports_crypto1_ = config->get_uints("/gazsim/comm/recv-ports-crypto1");
68  send_ports_crypto2_ = config->get_uints("/gazsim/comm/send-ports-crypto2");
69  recv_ports_crypto2_ = config->get_uints("/gazsim/comm/recv-ports-crypto2");
70  if (addresses_.size() != send_ports_.size() || addresses_.size() != recv_ports_.size()
71  || (use_crypto1_ && addresses_.size() != send_ports_crypto1_.size())
72  || (use_crypto1_ && addresses_.size() != recv_ports_crypto1_.size())
73  || (use_crypto2_ && addresses_.size() != send_ports_crypto2_.size())
74  || (use_crypto2_ && addresses_.size() != recv_ports_crypto2_.size())) {
75  logger->log_warn(name(), "/gazsim/comm/ has an invalid configuration!");
76  }
77 
78  //resolve proto paths
79  try {
80  proto_dirs_ = config->get_strings("/clips-protobuf/proto-dirs");
81  for (size_t i = 0; i < proto_dirs_.size(); ++i) {
82  std::string::size_type pos;
83  if ((pos = proto_dirs_[i].find("@BASEDIR@")) != std::string::npos) {
84  proto_dirs_[i].replace(pos, 9, BASEDIR);
85  }
86  if ((pos = proto_dirs_[i].find("@FAWKES_BASEDIR@")) != std::string::npos) {
87  proto_dirs_[i].replace(pos, 16, FAWKES_BASEDIR);
88  }
89  if ((pos = proto_dirs_[i].find("@RESDIR@")) != std::string::npos) {
90  proto_dirs_[i].replace(pos, 8, RESDIR);
91  }
92  if ((pos = proto_dirs_[i].find("@CONFDIR@")) != std::string::npos) {
93  proto_dirs_[i].replace(pos, 9, CONFDIR);
94  }
95  if (proto_dirs_[i][proto_dirs_.size() - 1] != '/') {
96  proto_dirs_[i] += "/";
97  }
98  }
99  } catch (Exception &e) {
100  logger->log_warn(name(), "Failed to load proto paths from config, exception follows");
101  logger->log_warn(name(), e);
102  }
103 
104  //create peer connections
105  peers_.resize(addresses_.size());
106  peers_crypto1_.resize(addresses_.size());
107  peers_crypto2_.resize(addresses_.size());
108  for (unsigned int i = 0; i < addresses_.size(); i++) {
109  peers_[i] =
110  new ProtobufBroadcastPeer(addresses_[i], send_ports_[i], recv_ports_[i], proto_dirs_);
111  peers_[i]->signal_received_raw().connect(
112  boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
113  peers_[i]->signal_send_error().connect(
114  boost::bind(&GazsimCommThread::peer_send_error, this, addresses_[i], send_ports_[i], _1));
115  if (use_crypto1_) {
116  peers_crypto1_[i] = new ProtobufBroadcastPeer(addresses_[i],
117  send_ports_crypto1_[i],
118  recv_ports_crypto1_[i],
119  proto_dirs_);
120  peers_crypto1_[i]->signal_received_raw().connect(
121  boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
122  peers_crypto1_[i]->signal_send_error().connect(boost::bind(
123  &GazsimCommThread::peer_send_error, this, addresses_[i], send_ports_crypto1_[i], _1));
124  }
125  if (use_crypto2_) {
126  peers_crypto2_[i] = new ProtobufBroadcastPeer(addresses_[i],
127  send_ports_crypto2_[i],
128  recv_ports_crypto2_[i],
129  proto_dirs_);
130  peers_crypto2_[i]->signal_received_raw().connect(
131  boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
132  peers_crypto2_[i]->signal_send_error().connect(boost::bind(
133  &GazsimCommThread::peer_send_error, this, addresses_[i], send_ports_crypto2_[i], _1));
134  }
135  }
136  initialized_ = true;
137 }
138 
139 void
141 {
142  for (unsigned int i = 0; i < peers_.size(); i++) {
143  delete peers_[i];
144  }
145 }
146 
147 void
149 {
150 }
151 
152 /**
153  * Receive and forward raw msg
154  * @param endpoint port msg received from
155  * @param header header of the msg
156  * @param data data stream
157  * @param length length of the data stream
158  */
159 void
160 GazsimCommThread::receive_raw_msg(boost::asio::ip::udp::endpoint &endpoint,
162  void * data,
163  size_t length)
164 {
165  //logger->log_info(name(), "Got raw Message from port %d", endpoint.port());
166  unsigned int incoming_peer_port = endpoint.port(); //this is suprisingly the send-port
167 
168  if (!initialized_) {
169  return;
170  }
171 
172  //simulate package loss
173  double rnd = ((double)rand()) / ((double)RAND_MAX); //0.0 <= rnd <= 1.0
174  if (rnd < package_loss_) {
175  return;
176  }
177 
178  //check which set of peers the message comes from
179  std::vector<protobuf_comm::ProtobufBroadcastPeer *> peers;
180  std::vector<unsigned int> send_ports;
181  if (std::find(send_ports_.begin(), send_ports_.end(), incoming_peer_port) != send_ports_.end()) {
182  peers = peers_;
183  send_ports = send_ports_;
184  } else if (use_crypto1_
185  && std::find(send_ports_crypto1_.begin(),
186  send_ports_crypto1_.end(),
187  incoming_peer_port)
188  != send_ports_crypto1_.end()) {
189  peers = peers_crypto1_;
190  send_ports = send_ports_crypto1_;
191  } else if (use_crypto2_
192  && std::find(send_ports_crypto2_.begin(),
193  send_ports_crypto2_.end(),
194  incoming_peer_port)
195  != send_ports_crypto2_.end()) {
196  peers = peers_crypto2_;
197  send_ports = send_ports_crypto2_;
198  }
199 
200  //send message to all other peers
201  for (unsigned int i = 0; i < peers.size(); i++) {
202  if (send_ports[i] != incoming_peer_port) {
203  peers[i]->send_raw(header, data, length);
204  }
205  }
206 }
207 
208 void
209 GazsimCommThread::peer_send_error(std::string address, unsigned int port, std::string err)
210 {
211  logger->log_warn(name(), "Peer send error for %s:%u: %s", address.c_str(), port, err.c_str());
212 }
fawkes::Configuration::get_bool
virtual bool get_bool(const char *path)=0
fawkes::BlockedTimingAspect
Definition: blocked_timing.h:56
fawkes::Thread::name
const char * name() const
Definition: thread.h:100
fawkes::Configuration::get_uints
virtual std::vector< unsigned int > get_uints(const char *path)=0
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:53
protobuf_comm::frame_header_t
Network framing header.
Definition: frame_header.h:76
protobuf_comm::ProtobufBroadcastPeer
Definition: peer.h:61
fawkes
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
fawkes::Configuration::get_strings
virtual std::vector< std::string > get_strings(const char *path)=0
fawkes::ConfigurableAspect::config
Configuration * config
Definition: configurable.h:53
GazsimCommThread::loop
virtual void loop()
Code to execute in the thread.
Definition: gazsim_comm_thread.cpp:148
fawkes::Configuration::get_float
virtual float get_float(const char *path)=0
fawkes::Thread
Definition: thread.h:45
GazsimCommThread::finalize
virtual void finalize()
Finalize the thread.
Definition: gazsim_comm_thread.cpp:140
GazsimCommThread::init
virtual void init()
Initialize the thread.
Definition: gazsim_comm_thread.cpp:53
fawkes::Exception
Definition: exception.h:41