24 #include <arpa/inet.h>
25 #include <blackboard/blackboard.h>
26 #include <blackboard/exceptions.h>
27 #include <blackboard/net/handler.h>
28 #include <blackboard/net/ilist_content.h>
29 #include <blackboard/net/interface_listener.h>
30 #include <blackboard/net/interface_observer.h>
31 #include <blackboard/net/messages.h>
32 #include <interface/interface.h>
33 #include <interface/interface_info.h>
34 #include <logging/liblogger.h>
35 #include <netcomm/fawkes/component_ids.h>
36 #include <netcomm/fawkes/hub.h>
56 : Thread(
"BlackBoardNetworkHandler", Thread::OPMODE_WAITFORWAKEUP),
57 FawkesNetworkHandler(FAWKES_CID_BLACKBOARD)
61 nhub_->add_handler(
this);
71 inbound_queue_.
clear();
73 for (lit_ = listeners_.begin(); lit_ != listeners_.end(); ++lit_) {
76 for (iit_ = interfaces_.begin(); iit_ != interfaces_.end(); ++iit_) {
77 bb_->
close(iit_->second);
85 while (!inbound_queue_.empty()) {
86 FawkesNetworkMessage *msg = inbound_queue_.front();
89 unsigned int clid = msg->clid();
91 switch (msg->msgid()) {
92 case MSG_BB_LIST_ALL: {
96 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
101 nhub_->
send(clid, FAWKES_CID_BLACKBOARD, MSG_BB_INTERFACE_LIST, ilist);
104 "Failed to send interface "
105 "list to %u, exception follows",
117 char type_pattern[INTERFACE_TYPE_SIZE_ + 1];
118 char id_pattern[INTERFACE_ID_SIZE_ + 1];
119 type_pattern[INTERFACE_TYPE_SIZE_] = 0;
120 id_pattern[INTERFACE_ID_SIZE_] = 0;
121 strncpy(type_pattern, lrm->
type_pattern, INTERFACE_TYPE_SIZE_);
122 strncpy(id_pattern, lrm->
id_pattern, INTERFACE_ID_SIZE_);
125 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
130 nhub_->
send(clid, FAWKES_CID_BLACKBOARD, MSG_BB_INTERFACE_LIST, ilist);
134 "interface list to %u, exception follows",
141 case MSG_BB_OPEN_FOR_READING:
142 case MSG_BB_OPEN_FOR_WRITING: {
145 char type[INTERFACE_TYPE_SIZE_ + 1];
146 char id[INTERFACE_ID_SIZE_ + 1];
147 type[INTERFACE_TYPE_SIZE_] = 0;
148 id[INTERFACE_ID_SIZE_] = 0;
149 strncpy(type, om->
type, INTERFACE_TYPE_SIZE_);
150 strncpy(
id, om->
id, INTERFACE_ID_SIZE_);
156 if (msg->msgid() == MSG_BB_OPEN_FOR_READING) {
161 if (memcmp(iface->
hash(), om->
hash, INTERFACE_HASH_SIZE_) != 0) {
163 "Opening interface %s::%s failed, "
169 interfaces_[iface->
serial()] = iface;
170 client_interfaces_[clid].push_back(iface);
171 serial_to_clid_[iface->
serial()] = clid;
172 listeners_[iface->
serial()] =
174 send_opensuccess(clid, iface);
178 "Opening interface %s::%s failed, "
179 "interface class not found",
185 "Opening interface %s::%s failed, "
186 "writer already exists",
192 "Opening interface %s::%s failed",
207 unsigned int sm_serial = ntohl(sm->
serial);
208 if (interfaces_.find(sm_serial) != interfaces_.end()) {
210 client_interfaces_.lock();
211 if (client_interfaces_.find(clid) != client_interfaces_.end()) {
213 for (ciit_ = client_interfaces_[clid].begin(); ciit_ != client_interfaces_[clid].end();
215 if ((*ciit_)->serial() == sm_serial) {
217 serial_to_clid_.erase(sm_serial);
218 client_interfaces_[clid].erase(ciit_);
219 if (client_interfaces_[clid].empty()) {
220 client_interfaces_.erase(clid);
226 client_interfaces_.unlock();
231 "Remote %u closing interface %s",
233 interfaces_[sm_serial]->uid());
234 delete listeners_[sm_serial];
235 listeners_.erase(sm_serial);
236 bb_->
close(interfaces_[sm_serial]);
237 interfaces_.erase(sm_serial);
238 interfaces_.unlock();
241 "Client %u tried to close "
242 "interface with serial %u, but opened by other client",
248 "Client %u tried to close "
249 "interface with serial %u which has not been opened",
259 case MSG_BB_DATA_CHANGED: {
260 void * payload = msg->payload();
262 unsigned int dm_serial = ntohl(dm->
serial);
263 if (interfaces_.find(dm_serial) != interfaces_.end()) {
264 if (ntohl(dm->
data_size) != interfaces_[dm_serial]->datasize()) {
266 "DATA_CHANGED: Data size mismatch, "
267 "expected %zu, but got %zu, ignoring.",
268 interfaces_[dm_serial]->datasize(),
271 interfaces_[dm_serial]->set_from_chunk((
char *)payload +
sizeof(
bb_idata_msg_t));
272 interfaces_[dm_serial]->write();
276 "DATA_CHANGED: Interface with "
277 "serial %u not found, ignoring.",
282 case MSG_BB_INTERFACE_MESSAGE: {
283 void * payload = msg->payload();
285 unsigned int mm_serial = ntohl(mm->
serial);
286 if (interfaces_.find(mm_serial) != interfaces_.end()) {
287 if (!interfaces_[mm_serial]->is_writer()) {
295 "MESSAGE: Data size mismatch, "
296 "expected %zu, but got %zu, ignoring.",
302 interfaces_[mm_serial]->msgq_enqueue(ifm);
306 "MESSAGE: Could not create "
307 "interface message, ignoring.");
312 "MESSAGE: Received message "
313 "notification, but for a writing instance, ignoring.");
317 "DATA_CHANGED: Interface with "
318 "serial %u not found, ignoring.",
325 "Unknown message of type %u "
337 BlackBoardNetworkHandler::send_opensuccess(
unsigned int clid, Interface *interface)
339 void * payload = calloc(1,
sizeof(bb_iopensucc_msg_t) + interface->datasize());
340 bb_iopensucc_msg_t *osm = (bb_iopensucc_msg_t *)payload;
341 osm->serial = htonl(interface->serial());
342 osm->writer_readers = htonl(interface->num_readers());
343 if (interface->has_writer()) {
344 osm->writer_readers |= htonl(0x80000000);
346 osm->writer_readers &= htonl(0x7FFFFFFF);
348 osm->data_size = htonl(interface->datasize());
350 if (!interface->is_writer()) {
354 memcpy((
char *)payload +
sizeof(bb_iopensucc_msg_t),
355 interface->datachunk(),
356 interface->datasize());
358 FawkesNetworkMessage *omsg =
359 new FawkesNetworkMessage(clid,
360 FAWKES_CID_BLACKBOARD,
363 sizeof(bb_iopensucc_msg_t) + interface->datasize());
366 }
catch (Exception &e) {
368 "Failed to send interface "
369 "open success to %u, exception follows",
376 BlackBoardNetworkHandler::send_openfailure(
unsigned int clid,
unsigned int error_code)
378 bb_iopenfail_msg_t *ofm = (bb_iopenfail_msg_t *)malloc(
sizeof(bb_iopenfail_msg_t));
379 ofm->error_code = htonl(error_code);
381 FawkesNetworkMessage *omsg =
new FawkesNetworkMessage(
382 clid, FAWKES_CID_BLACKBOARD, MSG_BB_OPEN_FAILURE, ofm,
sizeof(bb_iopenfail_msg_t));
385 }
catch (Exception &e) {
387 "Failed to send interface "
388 "open failure to %u, exception follows",
422 client_interfaces_.lock();
423 if (client_interfaces_.find(clid) != client_interfaces_.end()) {
425 for (ciit_ = client_interfaces_[clid].begin(); ciit_ != client_interfaces_[clid].end();
428 "Closing interface %s::%s of remote "
429 "%u (client disconnected)",
434 unsigned int serial = (*ciit_)->serial();
435 serial_to_clid_.erase(serial);
436 interfaces_.erase_locked(serial);
437 delete listeners_[serial];
438 listeners_.erase(serial);
441 client_interfaces_.erase(clid);
443 client_interfaces_.unlock();