37 #include <protobuf_comm/client.h>
39 #include <boost/lexical_cast.hpp>
41 using namespace boost::asio;
42 using namespace boost::system;
44 namespace protobuf_comm {
54 ProtobufStreamClient::ProtobufStreamClient()
55 : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_)
58 own_message_register_ =
true;
60 outbound_active_ =
false;
62 frame_header_version_ = PB_FRAME_V2;
64 in_frame_header_ = malloc(in_frame_header_size_);
65 in_data_ = malloc(in_data_size_);
75 : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_)
78 own_message_register_ =
true;
80 outbound_active_ =
false;
82 in_data_ = malloc(in_data_size_);
83 frame_header_version_ = PB_FRAME_V2;
85 in_frame_header_ = malloc(in_frame_header_size_);
94 frame_header_version_t header_version)
95 : resolver_(io_service_),
97 io_service_work_(io_service_),
98 message_register_(mr),
99 own_message_register_(false),
100 frame_header_version_(header_version)
103 outbound_active_ =
false;
104 in_data_size_ = 1024;
105 in_data_ = malloc(in_data_size_);
106 if (frame_header_version_ == PB_FRAME_V1) {
111 in_frame_header_ = malloc(in_frame_header_size_);
122 free(in_frame_header_);
123 if (own_message_register_) {
124 delete message_register_;
128 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 6))
130 run_asio_thread(boost::asio::io_service &io_service)
137 ProtobufStreamClient::run_asio()
139 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 6))
140 asio_thread_ = std::thread(run_asio_thread, std::ref(io_service_));
142 asio_thread_ = std::thread([
this]() { this->io_service_.run(); });
156 ip::tcp::resolver::query query(host, boost::lexical_cast<std::string>(port));
157 resolver_.async_resolve(query,
158 boost::bind(&ProtobufStreamClient::handle_resolve,
160 boost::asio::placeholders::error,
161 boost::asio::placeholders::iterator));
165 ProtobufStreamClient::handle_resolve(
const boost::system::error_code &err,
166 ip::tcp::resolver::iterator endpoint_iterator)
171 #if BOOST_ASIO_VERSION > 100409
172 boost::asio::async_connect(socket_,
175 socket_.async_connect(*endpoint_iterator,
177 boost::bind(&ProtobufStreamClient::handle_connect,
179 boost::asio::placeholders::error));
182 sig_disconnected_(err);
187 ProtobufStreamClient::handle_connect(
const boost::system::error_code &err)
190 #if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500
195 boost::system::error_code ec;
196 socket_.remote_endpoint(ec);
197 if (ec == boost::system::errc::not_connected) {
199 sig_disconnected_(ec);
208 sig_disconnected_(err);
213 ProtobufStreamClient::disconnect_nosig()
215 boost::system::error_code err;
216 if (socket_.is_open()) {
217 socket_.shutdown(ip::tcp::socket::shutdown_both, err);
228 sig_disconnected_(boost::system::error_code());
232 ProtobufStreamClient::start_recv()
234 boost::asio::async_read(socket_,
235 boost::asio::buffer(in_frame_header_, in_frame_header_size_),
236 boost::bind(&ProtobufStreamClient::handle_read_header,
238 boost::asio::placeholders::error));
242 ProtobufStreamClient::handle_read_header(
const boost::system::error_code &error)
246 if (frame_header_version_ == PB_FRAME_V1) {
247 frame_header_v1_t *frame_header = (frame_header_v1_t *)in_frame_header_;
248 to_read = ntohl(frame_header->payload_size);
250 frame_header_t *frame_header = (frame_header_t *)in_frame_header_;
251 to_read = ntohl(frame_header->payload_size);
253 if (to_read > in_data_size_) {
254 void *new_data = realloc(in_data_, to_read);
256 in_data_size_ = to_read;
260 sig_disconnected_(errc::make_error_code(errc::not_enough_memory));
264 boost::asio::async_read(socket_,
265 boost::asio::buffer(in_data_, to_read),
266 boost::bind(&ProtobufStreamClient::handle_read_message,
268 boost::asio::placeholders::error));
271 sig_disconnected_(error);
276 ProtobufStreamClient::handle_read_message(
const boost::system::error_code &error)
279 frame_header_t frame_header;
280 message_header_t message_header;
283 if (frame_header_version_ == PB_FRAME_V1) {
284 frame_header_v1_t *frame_header_v1 = (frame_header_v1_t *)in_frame_header_;
285 frame_header.header_version = PB_FRAME_V1;
286 frame_header.cipher = PB_ENCRYPTION_NONE;
287 frame_header.payload_size =
288 htonl(ntohl(frame_header_v1->payload_size) +
sizeof(message_header_t));
289 message_header.component_id = frame_header_v1->component_id;
290 message_header.msg_type = frame_header_v1->msg_type;
293 memcpy(&frame_header, in_frame_header_,
sizeof(frame_header_t));
295 message_header_t *msg_header =
static_cast<message_header_t *
>(in_data_);
296 message_header.component_id = msg_header->component_id;
297 message_header.msg_type = msg_header->msg_type;
299 data = (
char *)in_data_ +
sizeof(message_header);
302 uint16_t comp_id = ntohs(message_header.component_id);
303 uint16_t msg_type = ntohs(message_header.msg_type);
305 std::shared_ptr<google::protobuf::Message> m =
306 message_register_->
deserialize(frame_header, message_header, data);
308 sig_rcvd_(comp_id, msg_type, m);
309 }
catch (std::runtime_error &e) {
310 sig_recv_failed_(comp_id, msg_type, e.what());
316 sig_disconnected_(error);
321 ProtobufStreamClient::handle_write(
const boost::system::error_code &error,
328 std::lock_guard<std::mutex> lock(outbound_mutex_);
329 if (!outbound_queue_.empty()) {
330 QueueEntry *front_entry = outbound_queue_.front();
331 outbound_queue_.pop();
332 boost::asio::async_write(socket_,
333 front_entry->buffers,
334 boost::bind(&ProtobufStreamClient::handle_write,
336 boost::asio::placeholders::error,
337 boost::asio::placeholders::bytes_transferred,
340 outbound_active_ =
false;
344 sig_disconnected_(error);
357 throw std::runtime_error(
"Cannot send while not connected");
361 message_register_->
serialize(component_id,
368 if (frame_header_version_ == PB_FRAME_V1) {
375 entry->
buffers[1] = boost::asio::const_buffer();
382 std::lock_guard<std::mutex> lock(outbound_mutex_);
383 if (outbound_active_) {
384 outbound_queue_.push(entry);
386 outbound_active_ =
true;
387 boost::asio::async_write(socket_,
389 boost::bind(&ProtobufStreamClient::handle_write,
391 boost::asio::placeholders::error,
392 boost::asio::placeholders::bytes_transferred,
404 const google::protobuf::Descriptor * desc = m.GetDescriptor();
405 const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName(
"CompType");
407 throw std::logic_error(
"Message does not have CompType enum");
409 const google::protobuf::EnumValueDescriptor *compdesc = enumdesc->FindValueByName(
"COMP_ID");
410 const google::protobuf::EnumValueDescriptor *msgtdesc = enumdesc->FindValueByName(
"MSG_TYPE");
411 if (!compdesc || !msgtdesc) {
412 throw std::logic_error(
"Message CompType enum hs no COMP_ID or MSG_TYPE value");
414 int comp_id = compdesc->number();
415 int msg_type = msgtdesc->number();
416 if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
417 throw std::logic_error(
"Message has invalid COMP_ID");
419 if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
420 throw std::logic_error(
"Message has invalid MSG_TYPE");
423 send(comp_id, msg_type, m);