37 #include <protobuf_comm/server.h>
41 using namespace boost::asio;
42 using namespace boost::system;
44 namespace protobuf_comm {
59 ProtobufStreamServer::Session::Session(ClientID
id,
60 ProtobufStreamServer * parent,
61 boost::asio::io_service &io_service)
62 : id_(id), parent_(parent), socket_(io_service)
65 in_data_ = malloc(in_data_size_);
66 outbound_active_ =
false;
70 ProtobufStreamServer::Session::~Session()
72 boost::system::error_code err;
73 if (socket_.is_open()) {
74 socket_.shutdown(ip::tcp::socket::shutdown_both, err);
83 ProtobufStreamServer::Session::start_session()
85 remote_endpoint_ = socket_.remote_endpoint();
93 ProtobufStreamServer::Session::start_read()
95 boost::asio::async_read(socket_,
96 boost::asio::buffer(&in_frame_header_,
sizeof(frame_header_t)),
97 boost::bind(&ProtobufStreamServer::Session::handle_read_header,
99 boost::asio::placeholders::error));
108 ProtobufStreamServer::Session::send(uint16_t component_id,
110 google::protobuf::Message &m)
112 QueueEntry *entry =
new QueueEntry();
113 parent_->message_register().serialize(component_id,
117 entry->message_header,
118 entry->serialized_message);
120 entry->buffers[0] = boost::asio::buffer(&entry->frame_header,
sizeof(frame_header_t));
121 entry->buffers[1] = boost::asio::buffer(&entry->message_header,
sizeof(message_header_t));
122 entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
124 std::lock_guard<std::mutex> lock(outbound_mutex_);
125 if (outbound_active_) {
126 outbound_queue_.push(entry);
128 outbound_active_ =
true;
129 boost::asio::async_write(socket_,
131 boost::bind(&ProtobufStreamServer::Session::handle_write,
133 boost::asio::placeholders::error,
134 boost::asio::placeholders::bytes_transferred,
141 ProtobufStreamServer::Session::disconnect()
143 boost::system::error_code err;
144 if (socket_.is_open()) {
145 socket_.shutdown(ip::tcp::socket::shutdown_both, err);
152 ProtobufStreamServer::Session::handle_write(
const boost::system::error_code &error,
159 std::lock_guard<std::mutex> lock(outbound_mutex_);
160 if (!outbound_queue_.empty()) {
161 QueueEntry *front_entry = outbound_queue_.front();
162 outbound_queue_.pop();
163 boost::asio::async_write(socket_,
164 front_entry->buffers,
165 boost::bind(&ProtobufStreamServer::Session::handle_write,
167 boost::asio::placeholders::error,
168 boost::asio::placeholders::bytes_transferred,
171 outbound_active_ =
false;
174 parent_->disconnected(shared_from_this(), error);
185 ProtobufStreamServer::Session::handle_read_header(
const boost::system::error_code &error)
188 size_t to_read = ntohl(in_frame_header_.payload_size);
189 if (to_read > in_data_size_) {
190 void *new_data = realloc(in_data_, to_read);
192 in_data_size_ = to_read;
195 parent_->disconnected(shared_from_this(), errc::make_error_code(errc::not_enough_memory));
199 boost::asio::async_read(socket_,
200 boost::asio::buffer(in_data_, to_read),
201 boost::bind(&ProtobufStreamServer::Session::handle_read_message,
203 boost::asio::placeholders::error));
205 parent_->disconnected(shared_from_this(), error);
217 ProtobufStreamServer::Session::handle_read_message(
const boost::system::error_code &error)
220 message_header_t *message_header =
static_cast<message_header_t *
>(in_data_);
222 uint16_t comp_id = ntohs(message_header->component_id);
223 uint16_t msg_type = ntohs(message_header->msg_type);
225 std::shared_ptr<google::protobuf::Message> m =
226 parent_->message_register().deserialize(in_frame_header_,
228 (
char *)in_data_ +
sizeof(message_header_t));
229 parent_->sig_rcvd_(id_, comp_id, msg_type, m);
230 }
catch (std::runtime_error &e) {
232 parent_->sig_recv_failed_(id_, comp_id, msg_type, e.what());
236 parent_->disconnected(shared_from_this(), error);
252 : io_service_(), acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port))
255 own_message_register_ =
true;
258 acceptor_.set_option(socket_base::reuse_address(
true));
261 asio_thread_ = std::thread(&ProtobufStreamServer::run_asio,
this);
271 std::vector<std::string> &proto_path)
272 : io_service_(), acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port))
275 own_message_register_ =
true;
278 acceptor_.set_option(socket_base::reuse_address(
true));
281 asio_thread_ = std::thread(&ProtobufStreamServer::run_asio,
this);
290 acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port)),
291 message_register_(mr),
292 own_message_register_(false)
296 acceptor_.set_option(socket_base::reuse_address(
true));
299 asio_thread_ = std::thread(&ProtobufStreamServer::run_asio,
this);
307 if (own_message_register_) {
308 delete message_register_;
320 uint16_t component_id,
322 google::protobuf::Message &m)
324 if (sessions_.find(client) == sessions_.end()) {
325 throw std::runtime_error(
"Client does not exist");
328 sessions_[client]->send(component_id, msg_type, m);
339 uint16_t component_id,
341 std::shared_ptr<google::protobuf::Message> m)
343 send(client, component_id, msg_type, *m);
354 const google::protobuf::Descriptor * desc = m.GetDescriptor();
355 const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName(
"CompType");
357 throw std::logic_error(
"Message does not have CompType enum");
359 const google::protobuf::EnumValueDescriptor *compdesc = enumdesc->FindValueByName(
"COMP_ID");
360 const google::protobuf::EnumValueDescriptor *msgtdesc = enumdesc->FindValueByName(
"MSG_TYPE");
361 if (!compdesc || !msgtdesc) {
362 throw std::logic_error(
"Message CompType enum hs no COMP_ID or MSG_TYPE value");
364 int comp_id = compdesc->number();
365 int msg_type = msgtdesc->number();
366 if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
367 throw std::logic_error(
"Message has invalid COMP_ID");
369 if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
370 throw std::logic_error(
"Message has invalid MSG_TYPE");
373 send(client, comp_id, msg_type, m);
395 google::protobuf::Message &m)
397 std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
398 for (s = sessions_.begin(); s != sessions_.end(); ++s) {
399 send(s->first, component_id, msg_type, m);
411 std::shared_ptr<google::protobuf::Message> m)
413 std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
414 for (s = sessions_.begin(); s != sessions_.end(); ++s) {
415 send(s->first, component_id, msg_type, m);
425 std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
426 for (s = sessions_.begin(); s != sessions_.end(); ++s) {
437 std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
438 for (s = sessions_.begin(); s != sessions_.end(); ++s) {
449 if (sessions_.find(client) != sessions_.end()) {
450 boost::shared_ptr<Session> session = sessions_[client];
451 session->disconnect();
457 ProtobufStreamServer::start_accept()
459 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 7))
460 std::lock_guard<std::mutex> lock(next_cid_mutex_);
462 Session::Ptr new_session(
new Session(next_cid_++,
this, io_service_));
463 acceptor_.async_accept(new_session->socket(),
464 boost::bind(&ProtobufStreamServer::handle_accept,
467 boost::asio::placeholders::error));
471 ProtobufStreamServer::disconnected(boost::shared_ptr<Session> session,
472 const boost::system::error_code &error)
474 sessions_.erase(session->id());
475 sig_disconnected_(session->id(), error);
479 ProtobufStreamServer::handle_accept(Session::Ptr new_session,
480 const boost::system::error_code &error)
483 new_session->start_session();
484 sessions_[new_session->id()] = new_session;
485 sig_connected_(new_session->id(), new_session->remote_endpoint());
486 new_session->start_read();
493 ProtobufStreamServer::run_asio()
495 #if BOOST_ASIO_VERSION > 100409
496 while (!io_service_.stopped()) {
501 #if BOOST_ASIO_VERSION > 100409