23 #include "sync_thread.h"
25 #include <blackboard/remote.h>
26 #include <core/threading/mutex_locker.h>
27 #include <utils/time/wait.h>
46 std::string &peer_cfg_prefix,
50 set_name(
"BBSyncThread[%s]", peer.c_str());
53 bbsync_cfg_prefix_ = bbsync_cfg_prefix;
54 peer_cfg_prefix_ = peer_cfg_prefix;
69 unsigned int check_interval = 0;
74 check_interval =
config->
get_uint((bbsync_cfg_prefix_ +
"check_interval").c_str());
76 e.
append(
"Host or port not specified for peer");
81 check_interval =
config->
get_uint((peer_cfg_prefix_ +
"check_interval").c_str());
87 read_config_combos(peer_cfg_prefix_ +
"reading/",
false);
88 read_config_combos(peer_cfg_prefix_ +
"writing/",
true);
90 for (ComboMap::iterator i = combos_.begin(); i != combos_.end(); ++i) {
92 "Combo: %s, %s (%s, R) -> %s (%s, W)",
93 i->second.type.c_str(),
94 i->second.reader_id.c_str(),
95 i->second.remote_writer ?
"local" :
"remote",
96 i->second.writer_id.c_str(),
97 i->second.remote_writer ?
"remote" :
"local");
103 if (!check_connection()) {
133 BlackBoardSynchronizationThread::check_connection()
135 if (!remote_bb_ || !remote_bb_->
is_alive()) {
138 "Lost connection via remote BB to %s (%s:%u), will try to re-establish",
152 "Successfully connected via remote BB to %s (%s:%u)",
169 BlackBoardSynchronizationThread::read_config_combos(std::string prefix,
bool writing)
173 if (strcmp(i->
type(),
"string") != 0) {
175 "but found value of type %s",
182 std::string varname = std::string(i->
path()).substr(prefix.length());
186 if ((sf = uid.find(
"::")) == std::string::npos) {
188 throw Exception(
"Interface UID '%s' at %s is not valid, missing double colon",
193 std::string type = uid.substr(0, sf);
194 std::string
id = uid.substr(sf + 2);
195 combo_t combo = {type, id, id, writing};
197 if ((sf =
id.find(
"=")) != std::string::npos) {
199 combo.reader_id =
id.substr(0, sf);
200 combo.writer_id =
id.substr(sf + 1);
203 combos_[varname] = combo;
209 BlackBoardSynchronizationThread::open_interfaces()
214 ComboMap::iterator i;
215 for (i = combos_.begin(); i != combos_.end(); ++i) {
216 Interface *iface_reader = NULL, *iface_writer = NULL;
223 "Opening reading %s (%s:%s)",
224 i->second.remote_writer ?
"locally" :
"remotely",
225 i->second.type.c_str(),
226 i->second.reader_id.c_str());
228 reader_bb->
open_for_reading(i->second.type.c_str(), i->second.reader_id.c_str());
232 "Opening writing on %s (%s:%s)",
233 i->second.remote_writer ?
"remotely" :
"locally",
234 i->second.type.c_str(),
235 i->second.writer_id.c_str());
237 writer_bb->
open_for_writing(i->second.type.c_str(), i->second.writer_id.c_str());
240 InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
241 interfaces_[iface_reader] = ii;
244 reader_bb->
close(iface_reader);
245 writer_bb->
close(iface_writer);
255 sync_listeners_[iface_reader] = sync_listener;
257 if (i->second.remote_writer) {
266 BlackBoardSynchronizationThread::close_interfaces()
268 SyncListenerMap::iterator s;
269 for (s = sync_listeners_.begin(); s != sync_listeners_.end(); ++s) {
276 InterfaceMap::iterator i;
277 for (i = interfaces_.begin(); i != interfaces_.end(); ++i) {
279 "Closing %s reading interface %s",
280 i->second.combo->remote_writer ?
"local" :
"remote",
282 if (i->second.combo->remote_writer) {
287 remote_bb_->
close(i->first);
289 if (i->second.writer) {
291 "Closing %s writing interface %s",
292 i->second.combo->remote_writer ?
"remote" :
"local",
293 i->second.writer->uid());
294 if (i->second.combo->remote_writer) {
295 remote_bb_->
close(i->second.writer);
302 sync_listeners_.clear();
314 if (interfaces_[interface].writer) {
317 "Writer added for %s, but relay exists already. Bug?",
320 logger->
log_warn(name(),
"Writer added for %s, opening relay writer", interface->uid());
324 InterfaceInfo & ii = interfaces_[interface];
326 iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(), ii.combo->writer_id.c_str());
329 "Creating sync listener for %s:%s-%s",
330 ii.combo->type.c_str(),
331 ii.combo->reader_id.c_str(),
332 ii.combo->writer_id.c_str());
337 sync_listeners_[interface] = sync_listener;
341 delete sync_listener;
342 ii.writer_bb->close(iface);
344 "Failed to open writer for %s:%s-%s, sync broken",
345 ii.combo->type.c_str(),
346 ii.combo->reader_id.c_str(),
347 ii.combo->writer_id.c_str());
362 if (!interfaces_[interface].writer) {
364 logger->
log_warn(name(),
"Writer removed for %s, but no relay exists. Bug?", interface->uid());
366 logger->
log_warn(name(),
"Writer removed for %s, closing relay writer", interface->uid());
368 InterfaceInfo &ii = interfaces_[interface];
370 delete sync_listeners_[interface];
371 sync_listeners_[interface] = NULL;
373 ii.writer_bb->close(ii.writer);
378 "Failed to close writer for %s:%s-%s, sync broken",
379 ii.combo->type.c_str(),
380 ii.combo->reader_id.c_str(),
381 ii.combo->writer_id.c_str());