Fawkes API  Fawkes Development Version
tf_thread.cpp
1 
2 /***************************************************************************
3  * tf_thread.cpp - Thread to exchange transforms
4  *
5  * Created: Wed Oct 26 01:02:59 2011
6  * Copyright 2011 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "tf_thread.h"
23 
24 #include <core/threading/mutex_locker.h>
25 #include <ros/this_node.h>
26 
27 using namespace fawkes;
28 
29 /** @class RosTfThread "tf_thread.h"
30  * Thread to exchange transforms between Fawkes and ROS.
31  * This threads connects to Fawkes and ROS to read and write transforms.
32  * Transforms received on one end are republished to the other side. To
33  * Fawkes new frames are published during the sensor hook.
34  * @author Tim Niemueller
35  */
36 
37 /** Constructor. */
39 : Thread("RosTfThread", Thread::OPMODE_WAITFORWAKEUP),
40  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE),
41  TransformAspect(TransformAspect::DEFER_PUBLISHER),
42  BlackBoardInterfaceListener("RosTfThread")
43 {
44  tf_msg_queue_mutex_ = new Mutex();
45  seq_num_mutex_ = new Mutex();
46  last_update_ = new Time();
47 }
48 
49 /** Destructor. */
51 {
52  delete tf_msg_queue_mutex_;
53  delete seq_num_mutex_;
54  delete last_update_;
55 }
56 
57 void
59 {
60  active_queue_ = 0;
61  seq_num_ = 0;
62  last_update_->set_clock(clock);
63  last_update_->set_time(0, 0);
64 
65  cfg_use_tf2_ = config->get_bool("/ros/tf/use_tf2");
66  cfg_update_interval_ = 1.0;
67  try {
68  cfg_update_interval_ = config->get_float("/ros/tf/static-update-interval");
69  } catch (Exception &e) {
70  } // ignored, use default
71 
72  // Must do that before registering listener because we might already
73  // get events right away
74  if (cfg_use_tf2_) {
75 #ifndef HAVE_TF2_MSGS
76  throw Exception("tf2 enabled in config but not available at compile time");
77 #else
78  sub_tf_ = rosnode->subscribe("tf", 100, &RosTfThread::tf_message_cb_dynamic, this);
79  sub_static_tf_ = rosnode->subscribe("tf_static", 100, &RosTfThread::tf_message_cb_static, this);
80  pub_tf_ = rosnode->advertise<tf2_msgs::TFMessage>("tf", 100);
81  pub_static_tf_ = rosnode->advertise<tf2_msgs::TFMessage>("tf_static", 100, /* latch */ true);
82 #endif
83  } else {
84  sub_tf_ = rosnode->subscribe("tf", 100, &RosTfThread::tf_message_cb, this);
85  pub_tf_ = rosnode->advertise<::tf::tfMessage>("tf", 100);
86  }
87 
89  std::list<TransformInterface *>::iterator i;
90  for (i = tfifs_.begin(); i != tfifs_.end(); ++i) {
94  }
96 
97  publish_static_transforms_to_ros();
98 
99  bbio_add_observed_create("TransformInterface", "/tf*");
101 }
102 
103 void
105 {
108 
109  sub_tf_.shutdown();
110  pub_tf_.shutdown();
111 
112  std::list<TransformInterface *>::iterator i;
113  for (i = tfifs_.begin(); i != tfifs_.end(); ++i) {
114  blackboard->close(*i);
115  }
116  tfifs_.clear();
117 }
118 
119 void
121 {
122  tf_msg_queue_mutex_->lock();
123  unsigned int queue = active_queue_;
124  active_queue_ = 1 - active_queue_;
125  tf_msg_queue_mutex_->unlock();
126 
127  if (cfg_use_tf2_) {
128 #ifdef HAVE_TF2_MSGS
129  while (!tf2_msg_queues_[queue].empty()) {
130  const std::pair<bool, tf2_msgs::TFMessage::ConstPtr> &q = tf2_msg_queues_[queue].front();
131  const tf2_msgs::TFMessage::ConstPtr & msg = q.second;
132  const size_t tsize = msg->transforms.size();
133  for (size_t i = 0; i < tsize; ++i) {
134  publish_transform_to_fawkes(msg->transforms[i], q.first);
135  }
136  tf2_msg_queues_[queue].pop();
137  }
138 #endif
139  } else {
140  while (!tf_msg_queues_[queue].empty()) {
141  const ::tf::tfMessage::ConstPtr &msg = tf_msg_queues_[queue].front();
142  const size_t tsize = msg->transforms.size();
143  for (size_t i = 0; i < tsize; ++i) {
144  geometry_msgs::TransformStamped ts = msg->transforms[i];
145  if (!ts.header.frame_id.empty() && ts.header.frame_id[0] == '/') {
146  ts.header.frame_id = ts.header.frame_id.substr(1);
147  }
148  if (!ts.child_frame_id.empty() && ts.child_frame_id[0] == '/') {
149  ts.child_frame_id = ts.child_frame_id.substr(1);
150  }
151  publish_transform_to_fawkes(ts);
152  }
153  tf_msg_queues_[queue].pop();
154  }
155 
156  fawkes::Time now(clock);
157  if ((now - last_update_) > cfg_update_interval_) {
158  last_update_->stamp();
159 
160  publish_static_transforms_to_ros();
161  }
162  }
163 }
164 
165 void
167 {
168  TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
169  if (!tfif)
170  return;
171 
172  tfif->read();
173 
174  if (cfg_use_tf2_ && tfif->is_static_transform()) {
175  publish_static_transforms_to_ros();
176  } else {
177  if (cfg_use_tf2_) {
178 #ifdef HAVE_TF2_MSGS
179  tf2_msgs::TFMessage tmsg;
180  tmsg.transforms.push_back(create_transform_stamped(tfif));
181  pub_tf_.publish(tmsg);
182 #endif
183  } else {
184  ::tf::tfMessage tmsg;
185  if (tfif->is_static_transform()) {
186  // date time stamps slightly into the future so they are valid
187  // for longer and need less frequent updates.
188  fawkes::Time timestamp = fawkes::Time(clock) + (cfg_update_interval_ * 1.1);
189 
190  tmsg.transforms.push_back(create_transform_stamped(tfif, &timestamp));
191  } else {
192  tmsg.transforms.push_back(create_transform_stamped(tfif));
193  }
194  pub_tf_.publish(tmsg);
195  }
196  }
197 }
198 
199 void
200 RosTfThread::bb_interface_created(const char *type, const char *id) throw()
201 {
202  if (strncmp(type, "TransformInterface", INTERFACE_TYPE_SIZE_) != 0)
203  return;
204 
205  for (const auto &f : ros_frames_) {
206  // ignore interfaces that we publish ourself
207  if (f == id)
208  return;
209  }
210 
211  TransformInterface *tfif;
212  try {
213  //logger->log_info(name(), "Opening %s:%s", type, id);
214  tfif = blackboard->open_for_reading<TransformInterface>(id);
215  } catch (Exception &e) {
216  // ignored
217  logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what());
218  return;
219  }
220 
221  try {
222  bbil_add_data_interface(tfif);
223  bbil_add_reader_interface(tfif);
224  bbil_add_writer_interface(tfif);
225  blackboard->update_listener(this);
226  tfifs_.push_back(tfif);
227  } catch (Exception &e) {
228  blackboard->close(tfif);
229  logger->log_warn(name(), "Failed to register for %s:%s: %s", type, id, e.what());
230  return;
231  }
232 }
233 
234 void
236  unsigned int instance_serial) throw()
237 {
238  conditional_close(interface);
239 }
240 
241 void
243  unsigned int instance_serial) throw()
244 {
245  conditional_close(interface);
246 }
247 
248 void
249 RosTfThread::conditional_close(Interface *interface) throw()
250 {
251  // Verify it's a TransformInterface
252  TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
253  if (!tfif)
254  return;
255 
256  std::list<TransformInterface *>::iterator i;
257  for (i = tfifs_.begin(); i != tfifs_.end(); ++i) {
258  if (*interface == **i) {
259  if (!interface->has_writer() && (interface->num_readers() == 1)) {
260  // It's only us
261  logger->log_info(name(), "Last on %s, closing", interface->uid());
262  bbil_remove_data_interface(*i);
263  bbil_remove_reader_interface(*i);
264  bbil_remove_writer_interface(*i);
265  blackboard->update_listener(this);
266  blackboard->close(*i);
267  tfifs_.erase(i);
268  break;
269  }
270  }
271  }
272 }
273 
274 geometry_msgs::TransformStamped
275 RosTfThread::create_transform_stamped(TransformInterface *tfif, const Time *time)
276 {
277  double *translation = tfif->translation();
278  double *rotation = tfif->rotation();
279  if (!time)
280  time = tfif->timestamp();
281 
282  geometry_msgs::Vector3 t;
283  t.x = translation[0];
284  t.y = translation[1];
285  t.z = translation[2];
286  geometry_msgs::Quaternion r;
287  r.x = rotation[0];
288  r.y = rotation[1];
289  r.z = rotation[2];
290  r.w = rotation[3];
291  geometry_msgs::Transform tr;
292  tr.translation = t;
293  tr.rotation = r;
294 
295  geometry_msgs::TransformStamped ts;
296  seq_num_mutex_->lock();
297  ts.header.seq = ++seq_num_;
298  seq_num_mutex_->unlock();
299  ts.header.stamp = ros::Time(time->get_sec(), time->get_nsec());
300  ts.header.frame_id = tfif->frame();
301  ts.child_frame_id = tfif->child_frame();
302  ts.transform = tr;
303 
304  return ts;
305 }
306 
307 void
308 RosTfThread::publish_static_transforms_to_ros()
309 {
310  std::list<fawkes::TransformInterface *>::iterator t;
311  fawkes::Time now(clock);
312 
313  if (cfg_use_tf2_) {
314 #ifdef HAVE_TF2_MSGS
315  tf2_msgs::TFMessage tmsg;
316  for (t = tfifs_.begin(); t != tfifs_.end(); ++t) {
317  fawkes::TransformInterface *tfif = *t;
318  tfif->read();
319  if (tfif->is_static_transform()) {
320  tmsg.transforms.push_back(create_transform_stamped(tfif, &now));
321  }
322  }
323  pub_static_tf_.publish(tmsg);
324 #endif
325  } else {
326  // date time stamps slightly into the future so they are valid
327  // for longer and need less frequent updates.
328  fawkes::Time timestamp = now + (cfg_update_interval_ * 1.1);
329 
330  ::tf::tfMessage tmsg;
331  for (t = tfifs_.begin(); t != tfifs_.end(); ++t) {
332  fawkes::TransformInterface *tfif = *t;
333  tfif->read();
334  if (tfif->is_static_transform()) {
335  tmsg.transforms.push_back(create_transform_stamped(tfif, &timestamp));
336  }
337  }
338  pub_tf_.publish(tmsg);
339  }
340 }
341 
342 void
343 RosTfThread::publish_transform_to_fawkes(const geometry_msgs::TransformStamped &ts, bool static_tf)
344 {
345  const geometry_msgs::Vector3 & t = ts.transform.translation;
346  const geometry_msgs::Quaternion &r = ts.transform.rotation;
347 
348  fawkes::Time time(ts.header.stamp.sec, ts.header.stamp.nsec / 1000);
349 
350  fawkes::tf::Transform tr(fawkes::tf::Quaternion(r.x, r.y, r.z, r.w),
351  fawkes::tf::Vector3(t.x, t.y, t.z));
352  fawkes::tf::StampedTransform st(tr, time, ts.header.frame_id, ts.child_frame_id);
353 
354  if (tf_publishers.find(ts.child_frame_id) == tf_publishers.end()) {
355  try {
356  ros_frames_.push_back(std::string("/tf/") + ts.child_frame_id);
357  tf_add_publisher("%s", ts.child_frame_id.c_str());
358  tf_publishers[ts.child_frame_id]->send_transform(st, static_tf);
359  } catch (Exception &e) {
360  ros_frames_.pop_back();
361  logger->log_warn(name(),
362  "Failed to create Fawkes transform publisher for frame %s from ROS",
363  ts.child_frame_id.c_str());
364  logger->log_warn(name(), e);
365  }
366  } else {
367  tf_publishers[ts.child_frame_id]->send_transform(st, static_tf);
368  }
369 }
370 
371 /** Callback function for ROS tf message subscription.
372  * @param msg incoming message
373  */
374 void
375 RosTfThread::tf_message_cb(const ros::MessageEvent<::tf::tfMessage const> &msg_evt)
376 {
377  MutexLocker lock(tf_msg_queue_mutex_);
378 
379  const ::tf::tfMessage::ConstPtr &msg = msg_evt.getConstMessage();
380  std::string authority = msg_evt.getPublisherName();
381 
382  if (authority == "") {
383  logger->log_warn(name(), "Message received without callerid");
384  } else if (authority != ros::this_node::getName()) {
385  tf_msg_queues_[active_queue_].push(msg);
386  }
387 }
388 
389 #ifdef HAVE_TF2_MSGS
390 void
391 RosTfThread::tf_message_cb_static(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt)
392 {
393  tf_message_cb(msg_evt, true);
394 }
395 
396 void
397 RosTfThread::tf_message_cb_dynamic(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt)
398 {
399  tf_message_cb(msg_evt, false);
400 }
401 
402 void
403 RosTfThread::tf_message_cb(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt,
404  bool static_tf)
405 {
406  MutexLocker lock(tf_msg_queue_mutex_);
407 
408  const tf2_msgs::TFMessage::ConstPtr &msg = msg_evt.getConstMessage();
409  std::string authority = msg_evt.getPublisherName();
410 
411  if (authority == "") {
412  logger->log_warn(name(), "Message received without callerid");
413  } else if (authority != ros::this_node::getName()) {
414  tf2_msg_queues_[active_queue_].push(std::make_pair(static_tf, msg));
415  }
416 }
417 #endif
fawkes::Mutex::lock
void lock()
Lock this mutex.
Definition: mutex.cpp:93
fawkes::tf::StampedTransform
Transform that contains a timestamp and frame IDs.
Definition: types.h:101
fawkes::TransformInterface::child_frame
char * child_frame() const
Get child_frame value.
Definition: TransformInterface.cpp:127
fawkes::TransformInterface::translation
double * translation() const
Get translation value.
Definition: TransformInterface.cpp:203
fawkes::Time::set_time
void set_time(const timeval *tv)
Sets the time.
Definition: time.cpp:253
RosTfThread::~RosTfThread
virtual ~RosTfThread()
Destructor.
Definition: tf_thread.cpp:50
fawkes::TransformInterface::is_static_transform
bool is_static_transform() const
Get static_transform value.
Definition: TransformInterface.cpp:165
fawkes::BlackBoard::register_listener
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:190
fawkes::Time::get_sec
long get_sec() const
Definition: time.h:123
fawkes::Mutex
Definition: mutex.h:38
fawkes::Interface::timestamp
const Time * timestamp() const
Get timestamp of last write.
Definition: interface.cpp:707
fawkes::Interface::read
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:477
fawkes::Configuration::get_bool
virtual bool get_bool(const char *path)=0
fawkes::Time::get_nsec
long get_nsec() const
Definition: time.h:138
fawkes::ROSAspect::rosnode
LockPtr< ros::NodeHandle > rosnode
Definition: ros.h:47
fawkes::MutexLocker
Definition: mutex_locker.h:39
fawkes::BlackBoard::unregister_listener
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:217
fawkes::BlackBoardInterfaceListener
Definition: interface_listener.h:47
fawkes::TransformInterface::rotation
double * rotation() const
Get rotation value.
Definition: TransformInterface.cpp:280
fawkes::BlockedTimingAspect
Definition: blocked_timing.h:56
RosTfThread::bb_interface_data_changed
virtual void bb_interface_data_changed(fawkes::Interface *interface)
BlackBoard data changed notification.
Definition: tf_thread.cpp:166
fawkes::Thread::name
const char * name() const
Definition: thread.h:100
fawkes::ClockAspect::clock
Clock * clock
Definition: clock.h:56
fawkes::Mutex::unlock
void unlock()
Unlock the mutex.
Definition: mutex.cpp:137
fawkes::TransformInterface::frame
char * frame() const
Get frame value.
Definition: TransformInterface.cpp:89
fawkes::BlackBoardInterfaceObserver::bbio_add_observed_create
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*")
Add interface creation type to watch list.
Definition: interface_observer.cpp:125
fawkes::MultiLogger::log_warn
virtual void log_warn(const char *component, const char *format,...)
Definition: multi.cpp:222
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:53
fawkes::BlackBoard::close
virtual void close(Interface *interface)=0
fawkes
fawkes::TransformAspect::tf_add_publisher
void tf_add_publisher(const char *frame_id_format,...)
Late add of publisher.
Definition: tf.cpp:192
fawkes::TransformAspect
Definition: tf.h:43
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
fawkes::BlackBoard::register_observer
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:230
RosTfThread::bb_interface_reader_removed
virtual void bb_interface_reader_removed(fawkes::Interface *interface, unsigned int instance_serial)
A reading instance has been closed for a watched interface.
Definition: tf_thread.cpp:242
fawkes::Interface
Definition: interface.h:78
fawkes::Time::set_clock
void set_clock(Clock *clock)
Set clock for this instance.
Definition: time.cpp:315
fawkes::ConfigurableAspect::config
Configuration * config
Definition: configurable.h:53
RosTfThread::finalize
virtual void finalize()
Finalize the thread.
Definition: tf_thread.cpp:104
fawkes::Time
Definition: time.h:98
fawkes::Exception::what
virtual const char * what() const
Get primary string.
Definition: exception.cpp:639
fawkes::Configuration::get_float
virtual float get_float(const char *path)=0
fawkes::Thread
Definition: thread.h:45
fawkes::BlackBoard::unregister_observer
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: blackboard.cpp:245
RosTfThread::loop
virtual void loop()
Code to execute in the thread.
Definition: tf_thread.cpp:120
fawkes::BlackBoardAspect::blackboard
BlackBoard * blackboard
Definition: blackboard.h:49
fawkes::BlackBoard::open_for_reading
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
fawkes::Time::stamp
Time & stamp()
Set this time to the current time.
Definition: time.cpp:711
fawkes::TransformInterface
Definition: TransformInterface.h:39
fawkes::BlackBoardInterfaceListener::bbil_add_reader_interface
void bbil_add_reader_interface(Interface *interface)
Add an interface to the reader addition/removal watch list.
Definition: interface_listener.cpp:264
fawkes::BlackBoard::open_multiple_for_reading
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
RosTfThread::RosTfThread
RosTfThread()
Constructor.
Definition: tf_thread.cpp:38
fawkes::BlackBoardInterfaceListener::bbil_add_data_interface
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
Definition: interface_listener.cpp:238
RosTfThread::bb_interface_created
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
Definition: tf_thread.cpp:200
fawkes::TransformAspect::tf_publishers
std::map< std::string, tf::TransformPublisher * > tf_publishers
Definition: tf.h:75
RosTfThread::bb_interface_writer_removed
virtual void bb_interface_writer_removed(fawkes::Interface *interface, unsigned int instance_serial)
A writing instance has been closed for a watched interface.
Definition: tf_thread.cpp:235
fawkes::BlackBoard::update_listener
virtual void update_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Update BB event listener.
Definition: blackboard.cpp:202
fawkes::BlackBoardInterfaceListener::bbil_add_writer_interface
void bbil_add_writer_interface(Interface *interface)
Add an interface to the writer addition/removal watch list.
Definition: interface_listener.cpp:276
RosTfThread::init
virtual void init()
Initialize the thread.
Definition: tf_thread.cpp:58
fawkes::MultiLogger::log_info
virtual void log_info(const char *component, const char *format,...)
Definition: multi.cpp:201
fawkes::Exception
Definition: exception.h:41