Fawkes API  Fawkes Development Version
sick_tim55x_ethernet_aqt.cpp
1 
2 /***************************************************************************
3  * sick_tim55x_ethernet_aqt.cpp - Retrieve data from Sick TiM55x via Ethernet
4  *
5  * Created: Sun Jun 15 20:45:42 2014
6  * Copyright 2008-2014 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "sick_tim55x_ethernet_aqt.h"
24 
25 #include <core/threading/mutex.h>
26 #include <core/threading/mutex_locker.h>
27 #include <utils/math/angle.h>
28 #include <utils/misc/string_split.h>
29 
30 #include <boost/lambda/bind.hpp>
31 #include <boost/lambda/lambda.hpp>
32 #include <boost/lexical_cast.hpp>
33 #if BOOST_VERSION < 104800
34 # include <boost/bind.hpp>
35 #endif
36 #include <cstdio>
37 #include <cstdlib>
38 #include <cstring>
39 #include <unistd.h>
40 
41 using namespace fawkes;
42 
43 #define RECONNECT_INTERVAL 1000
44 #define RECEIVE_TIMEOUT 500
45 
46 /** @class SickTiM55xEthernetAcquisitionThread "sick_tim55x_ethernet_aqt.h"
47  * Laser acqusition thread for Sick TiM55x laser range finders.
48  * This thread fetches the data from the laser.
49  * @author Tim Niemueller
50  */
51 
52 /** Constructor.
53  * @param cfg_name short name of configuration group
54  * @param cfg_prefix configuration path prefix
55  */
57  std::string &cfg_prefix)
58 : SickTiM55xCommonAcquisitionThread(cfg_name, cfg_prefix),
59  socket_(io_service_),
60  deadline_(io_service_),
61  soft_deadline_(io_service_)
62 {
63  set_name("SickTiM55x(%s)", cfg_name.c_str());
64 }
65 
66 void
68 {
70 
71  cfg_host_ = config->get_string((cfg_prefix_ + "host").c_str());
72  cfg_port_ = config->get_string((cfg_prefix_ + "port").c_str());
73 
74  socket_mutex_ = new Mutex();
75 
76  deadline_.expires_at(boost::posix_time::pos_infin);
77  check_deadline();
78 
79  soft_deadline_.expires_at(boost::posix_time::pos_infin);
80  check_soft_timeout();
81 
82  init_device();
83 
85 }
86 
87 void
89 {
90  free(_distances);
91  _distances = NULL;
92 
93  free(_echoes);
94  _echoes = NULL;
95 
96  delete socket_mutex_;
97 }
98 
99 void
101 {
102  if (socket_.is_open()) {
103  try {
104  deadline_.expires_from_now(boost::posix_time::milliseconds(RECEIVE_TIMEOUT));
105 
106  ec_ = boost::asio::error::would_block;
107  bytes_read_ = 0;
108 
109  boost::asio::async_read_until(socket_,
110  input_buffer_,
111  '\03',
112 #if BOOST_VERSION >= 104800
113  (boost::lambda::var(ec_) = boost::lambda::_1,
114  boost::lambda::var(bytes_read_) = boost::lambda::_2));
115 #else
116  boost::bind(&SickTiM55xEthernetAcquisitionThread::handle_read,
117  this,
118  boost::asio::placeholders::error,
119  boost::asio::placeholders::bytes_transferred));
120 #endif
121 
122  do
123  io_service_.run_one();
124  while (ec_ == boost::asio::error::would_block);
125 
126  reset_distances();
127  reset_echoes();
128 
129  if (ec_) {
130  if (ec_.value() == boost::system::errc::operation_canceled) {
131  logger->log_error(name(), "Data timeout, will try to reconnect");
132  } else {
133  logger->log_warn(name(), "Data read error: %s\n", ec_.message().c_str());
134  }
135  _data_mutex->lock();
136  _timestamp->stamp();
137  _new_data = true;
138  _data_mutex->unlock();
139  close_device();
140 
141  } else {
142  deadline_.expires_at(boost::posix_time::pos_infin);
143 
144  unsigned char recv_buf[bytes_read_];
145  std::istream in_stream(&input_buffer_);
146  in_stream.read((char *)recv_buf, bytes_read_);
147 
148  if (bytes_read_ > 0) {
149  try {
150  parse_datagram(recv_buf, bytes_read_);
151  } catch (Exception &e) {
152  logger->log_warn(name(), "Failed to parse datagram, resyncing, exception follows");
153  logger->log_warn(name(), e);
154  resync();
155  }
156  }
157  }
158  } catch (boost::system::system_error &e) {
159  if (e.code() == boost::asio::error::eof) {
160  close_device();
161  logger->log_warn(name(), "Sick TiM55x/Ethernet connection lost, trying to reconnect");
162  } else {
163  logger->log_warn(name(), "Sick TiM55x/Ethernet failed read: %s", e.what());
164  }
165  }
166  } else {
167  try {
168  init_device();
169  logger->log_warn(name(), "Reconnected to device");
170  } catch (Exception &e) {
171  // ignore, keep trying
172  usleep(RECONNECT_INTERVAL * 1000);
173  }
174  }
175 
176  yield();
177 }
178 
179 void
180 SickTiM55xEthernetAcquisitionThread::open_device()
181 {
182  try {
183  boost::asio::ip::tcp::resolver resolver(io_service_);
184  boost::asio::ip::tcp::resolver::query query(cfg_host_, cfg_port_);
185  boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
186 
187  // this is just the overly complicated way to get a timeout on
188  // a synchronous connect, cf.
189  // http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/example/cpp03/timeouts/blocking_tcp_client.cpp
190 
191  deadline_.expires_from_now(boost::posix_time::seconds(5));
192 
193  for (; iter != boost::asio::ip::tcp::resolver::iterator(); ++iter) {
194  socket_.close();
195  ec_ = boost::asio::error::would_block;
196 #if BOOST_VERSION >= 104800
197  socket_.async_connect(iter->endpoint(), boost::lambda::var(ec_) = boost::lambda::_1);
198 #else
199  socket_.async_connect(iter->endpoint(),
200  boost::bind(&SickTiM55xEthernetAcquisitionThread::handle_read,
201  this,
202  boost::asio::placeholders::error,
203  0));
204 #endif
205 
206  // Block until the asynchronous operation has completed.
207  do
208  io_service_.run_one();
209  while (ec_ == boost::asio::error::would_block);
210 
211  // Determine whether a connection was successfully established.
212  if (ec_ || !socket_.is_open()) {
213  if (ec_.value() == boost::system::errc::operation_canceled) {
214  throw Exception("Sick TiM55X Ethernet: connection timed out");
215  } else {
216  throw Exception("Connection failed: %s", ec_.message().c_str());
217  }
218  }
219  deadline_.expires_at(boost::posix_time::pos_infin);
220  }
221  } catch (boost::system::system_error &e) {
222  throw Exception("Connection failed: %s", e.what());
223  }
224 }
225 
226 void
227 SickTiM55xEthernetAcquisitionThread::close_device()
228 {
229  boost::system::error_code err;
230  if (socket_.is_open()) {
231  socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, err);
232  socket_.close();
233  }
234 }
235 
236 void
237 SickTiM55xEthernetAcquisitionThread::flush_device()
238 {
239  if (socket_.is_open()) {
240  try {
241  soft_deadline_.expires_from_now(boost::posix_time::milliseconds(RECEIVE_TIMEOUT));
242  do {
243  ec_ = boost::asio::error::would_block;
244  bytes_read_ = 0;
245 
246  boost::asio::async_read_until(socket_,
247  input_buffer_,
248  '\03',
249 #if BOOST_VERSION >= 104800
250  (boost::lambda::var(ec_) = boost::lambda::_1,
251  boost::lambda::var(bytes_read_) = boost::lambda::_2));
252 #else
253  boost::bind(&SickTiM55xEthernetAcquisitionThread::handle_read,
254  this,
255  boost::asio::placeholders::error,
256  boost::asio::placeholders::bytes_transferred));
257 #endif
258 
259  do
260  io_service_.run_one();
261  while (ec_ == boost::asio::error::would_block);
262 
263  } while (bytes_read_ > 0);
264  soft_deadline_.expires_from_now(boost::posix_time::pos_infin);
265  } catch (boost::system::system_error &e) {
266  // ignore, just assume done, if there really is an error we'll
267  // catch it later on
268  }
269  }
270 }
271 
272 void
273 SickTiM55xEthernetAcquisitionThread::send_with_reply(const char *request, std::string *reply)
274 {
275  MutexLocker lock(socket_mutex_);
276 
277  int request_length = strlen(request);
278 
279  try {
280  boost::asio::write(socket_, boost::asio::buffer(request, request_length));
281 
282  deadline_.expires_from_now(boost::posix_time::milliseconds(RECEIVE_TIMEOUT));
283 
284  ec_ = boost::asio::error::would_block;
285  bytes_read_ = 0;
286  boost::asio::async_read_until(socket_,
287  input_buffer_,
288  '\03',
289 #if BOOST_VERSION >= 104800
290  (boost::lambda::var(ec_) = boost::lambda::_1,
291  boost::lambda::var(bytes_read_) = boost::lambda::_2));
292 #else
293  boost::bind(&SickTiM55xEthernetAcquisitionThread::handle_read,
294  this,
295  boost::asio::placeholders::error,
296  boost::asio::placeholders::bytes_transferred));
297 #endif
298 
299  do
300  io_service_.run_one();
301  while (ec_ == boost::asio::error::would_block);
302 
303  if (ec_) {
304  if (ec_.value() == boost::system::errc::operation_canceled) {
305  throw Exception("Timeout waiting for message reply");
306  } else {
307  throw Exception("Failed to read reply: %s", ec_.message().c_str());
308  }
309  }
310 
311  deadline_.expires_at(boost::posix_time::pos_infin);
312 
313  if (reply) {
314  char recv_buf[bytes_read_];
315  std::istream in_stream(&input_buffer_);
316  in_stream.read(recv_buf, bytes_read_);
317  *reply = std::string(recv_buf, bytes_read_);
318  } else {
319  input_buffer_.consume(bytes_read_);
320  }
321  } catch (boost::system::system_error &e) {
322  throw Exception("Sick TiM55x/Ethernet failed I/O: %s", e.what());
323  }
324 }
325 
326 /** Check whether the deadline has passed.
327  * We compare the deadline against
328  * the current time since a new asynchronous operation may have moved the
329  * deadline before this actor had a chance to run.
330  */
331 void
332 SickTiM55xEthernetAcquisitionThread::check_deadline()
333 {
334  if (deadline_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
335  socket_.close();
336  deadline_.expires_at(boost::posix_time::pos_infin);
337  }
338 
339 #if BOOST_VERSION >= 104800
340  deadline_.async_wait(
341  boost::lambda::bind(&SickTiM55xEthernetAcquisitionThread::check_deadline, this));
342 #else
343  deadline_.async_wait(boost::bind(&SickTiM55xEthernetAcquisitionThread::check_deadline, this));
344 #endif
345 }
346 
347 /** Check whether the soft timeout deadline has passed.
348  * We compare the deadline against the current time since a new
349  * asynchronous operation may have moved the deadline before this
350  * actor had a chance to run.
351  */
352 void
353 SickTiM55xEthernetAcquisitionThread::check_soft_timeout()
354 {
355  if (soft_deadline_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
356  socket_.cancel();
357  soft_deadline_.expires_at(boost::posix_time::pos_infin);
358  }
359 
360 #if BOOST_VERSION >= 104800
361  soft_deadline_.async_wait(
362  boost::lambda::bind(&SickTiM55xEthernetAcquisitionThread::check_soft_timeout, this));
363 #else
364  soft_deadline_.async_wait(
365  boost::bind(&SickTiM55xEthernetAcquisitionThread::check_soft_timeout, this));
366 #endif
367 }
fawkes::Mutex::lock
void lock()
Lock this mutex.
Definition: mutex.cpp:93
SickTiM55xCommonAcquisitionThread::parse_datagram
void parse_datagram(const unsigned char *datagram, size_t datagram_length)
Parse incoming message from device.
Definition: sick_tim55x_common_aqt.cpp:200
fawkes::Mutex
Definition: mutex.h:38
SickTiM55xEthernetAcquisitionThread::loop
virtual void loop()
Code to execute in the thread.
Definition: sick_tim55x_ethernet_aqt.cpp:100
fawkes::MutexLocker
Definition: mutex_locker.h:39
LaserAcquisitionThread::_data_mutex
fawkes::Mutex * _data_mutex
Definition: acquisition_thread.h:74
LaserAcquisitionThread::_echoes
float * _echoes
Definition: acquisition_thread.h:79
fawkes::Thread::yield
void yield()
Yield the processor to another thread or process.
Definition: thread.cpp:889
SickTiM55xEthernetAcquisitionThread::finalize
virtual void finalize()
Finalize the thread.
Definition: sick_tim55x_ethernet_aqt.cpp:88
fawkes::Thread::name
const char * name() const
Definition: thread.h:100
fawkes::Mutex::unlock
void unlock()
Unlock the mutex.
Definition: mutex.cpp:137
LaserAcquisitionThread::_timestamp
fawkes::Time * _timestamp
Definition: acquisition_thread.h:75
LaserAcquisitionThread::reset_distances
void reset_distances()
Reset all distance values to NaN.
Definition: acquisition_thread.cpp:202
LaserAcquisitionThread::_new_data
bool _new_data
Definition: acquisition_thread.h:77
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:53
SickTiM55xCommonAcquisitionThread::read_common_config
void read_common_config()
Read common configuration parameters.
Definition: sick_tim55x_common_aqt.cpp:121
fawkes::Logger::log_error
virtual void log_error(const char *component, const char *format,...)=0
SickTiM55xCommonAcquisitionThread::pre_init
virtual void pre_init(fawkes::Configuration *config, fawkes::Logger *logger)
Definition: sick_tim55x_common_aqt.cpp:91
fawkes
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
LaserAcquisitionThread::_distances
float * _distances
Definition: acquisition_thread.h:78
fawkes::ConfigurableAspect::config
Configuration * config
Definition: configurable.h:53
SickTiM55xEthernetAcquisitionThread::init
virtual void init()
Initialize the thread.
Definition: sick_tim55x_ethernet_aqt.cpp:67
LaserAcquisitionThread::reset_echoes
void reset_echoes()
Reset all distance values to NaN.
Definition: acquisition_thread.cpp:217
SickTiM55xCommonAcquisitionThread::cfg_prefix_
std::string cfg_prefix_
Definition: sick_tim55x_common_aqt.h:71
SickTiM55xCommonAcquisitionThread::init_device
void init_device()
Initialize device.
Definition: sick_tim55x_common_aqt.cpp:133
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
fawkes::Time::stamp
Time & stamp()
Set this time to the current time.
Definition: time.cpp:711
SickTiM55xEthernetAcquisitionThread::SickTiM55xEthernetAcquisitionThread
SickTiM55xEthernetAcquisitionThread(std::string &cfg_name, std::string &cfg_prefix)
Constructor.
Definition: sick_tim55x_ethernet_aqt.cpp:56
SickTiM55xCommonAcquisitionThread
Definition: sick_tim55x_common_aqt.h:37
fawkes::Thread::set_name
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:754
SickTiM55xCommonAcquisitionThread::resync
void resync()
Resynchronize to laser data.
Definition: sick_tim55x_common_aqt.cpp:174
fawkes::Exception
Definition: exception.h:41