Fawkes API  Fawkes Development Version
oprs_protobuf.cpp
1 
2 /***************************************************************************
3  * oprs_protobuf.cpp - protobuf network communication for OpenPRS
4  *
5  * Created: Tue Sep 02 16:53:26 2014 (based on CLIPS version)
6  * Copyright 2013-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "oprs_protobuf.h"
38 
39 #include <core/exception.h>
40 #include <core/threading/mutex_locker.h>
41 #include <google/protobuf/descriptor.h>
42 #include <protobuf_comm/client.h>
43 #include <protobuf_comm/peer.h>
44 #include <protobuf_comm/server.h>
45 
46 #include <algorithm>
47 #include <oprs_f-pub.h>
48 
49 using namespace google::protobuf;
50 using namespace protobuf_comm;
51 
52 namespace oprs_protobuf {
53 
54 /** @class OpenPRSProtobuf "oprs_protobuf.h"
55  * OpenPRS protobuf integration class.
56  * This class adds functionality related to protobuf to OpenPRS.
57  * It supports the creation of communication channels through protobuf_comm.
58  * An instance maintains its own message register shared among server, peer,
59  * and clients.
60  * @author Tim Niemueller
61  */
62 
63 /** Constructor.
64  * @param proto_path proto path passed to a newly instantiated message register
65  */
66 OpenPRSProtobuf::OpenPRSProtobuf(std::vector<std::string> &proto_path)
67 : message_register_(new MessageRegister(proto_path)), server_(NULL), next_client_id_(0)
68 {
69 }
70 
71 /** Destructor. */
73 {
74  for (auto c : clients_) {
75  delete c.second;
76  }
77  clients_.clear();
78 
79  delete server_;
80  message_register_.reset();
81 }
82 
83 /** Enable protobuf stream server.
84  * @param port TCP port to listen on for connections
85  */
86 void
88 {
89  if ((port > 0) && !server_) {
90  server_ = new protobuf_comm::ProtobufStreamServer(port, &*message_register_);
91 
92  server_->signal_connected().connect(
93  boost::bind(&OpenPRSProtobuf::handle_server_client_connected, this, _1, _2));
94  server_->signal_disconnected().connect(
95  boost::bind(&OpenPRSProtobuf::handle_server_client_disconnected, this, _1, _2));
96  server_->signal_received().connect(
97  boost::bind(&OpenPRSProtobuf::handle_server_client_msg, this, _1, _2, _3, _4));
98  server_->signal_receive_failed().connect(
99  boost::bind(&OpenPRSProtobuf::handle_server_client_fail, this, _1, _2, _3, _4));
100  }
101 }
102 
103 /** Disable protobuf stream server. */
104 void
106 {
107  delete server_;
108  server_ = NULL;
109 }
110 
111 /** Enable protobuf peer.
112  * @param address IP address to send messages to
113  * @param send_port UDP port to send messages to
114  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
115  * @param crypto_key encryption key
116  * @param cipher cipher suite, see BufferEncryptor for supported types
117  * @return peer identifier
118  */
119 Term *
121  int send_port,
122  int recv_port,
123  const std::string &crypto_key,
124  const std::string &cipher)
125 {
126  if (recv_port <= 0)
127  recv_port = send_port;
128 
129  if (send_port > 0) {
131  address, send_port, recv_port, &*message_register_, crypto_key, cipher);
132 
133  long int peer_id;
134  {
135  fawkes::MutexLocker lock(&map_mutex_);
136  peer_id = ++next_client_id_;
137  peers_[peer_id] = peer;
138  }
139 
140  peer->signal_received().connect(
141  boost::bind(&OpenPRSProtobuf::handle_peer_msg, this, peer_id, _1, _2, _3, _4));
142  peer->signal_recv_error().connect(
143  boost::bind(&OpenPRSProtobuf::handle_peer_recv_error, this, peer_id, _1, _2));
144  peer->signal_send_error().connect(
145  boost::bind(&OpenPRSProtobuf::handle_peer_send_error, this, peer_id, _1));
146 
147  return build_long_long(peer_id);
148  } else {
149  return build_long_long(0);
150  }
151 }
152 
153 /** Enable protobuf peer.
154  * @param address IP address to send messages to
155  * @param port UDP port to send and receive messages
156  * @param crypto_key encryption key
157  * @param cipher cipher suite, see BufferEncryptor for supported types
158  * @return peer identifier
159  */
160 Term *
162  int port,
163  const std::string &crypto_key,
164  const std::string &cipher)
165 {
166  return oprs_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
167 }
168 
169 /** Enable protobuf peer.
170  * @param address IP address to send messages to
171  * @param port UDP port to send and receive messages
172  * @return peer identifier
173  */
174 Term *
175 OpenPRSProtobuf::oprs_pb_peer_create(const std::string &address, int port)
176 {
177  return oprs_pb_peer_create_local_crypto(address, port, port);
178 }
179 
180 /** Enable protobuf peer.
181  * @param address IP address to send messages to
182  * @param send_port UDP port to send messages to
183  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
184  * @return peer identifier
185  */
186 Term *
187 OpenPRSProtobuf::oprs_pb_peer_create_local(const std::string &address, int send_port, int recv_port)
188 {
189  return oprs_pb_peer_create_local_crypto(address, send_port, recv_port);
190 }
191 
192 /** Disable peer.
193  * @param peer_id ID of the peer to destroy
194  */
195 void
197 {
198  if (peers_.find(peer_id) != peers_.end()) {
199  delete peers_[peer_id];
200  peers_.erase(peer_id);
201  }
202 }
203 
204 /** Setup crypto for peer.
205  * @param peer_id ID of the peer to destroy
206  * @param crypto_key encryption key
207  * @param cipher cipher suite, see BufferEncryptor for supported types
208  */
209 void
211  const std::string &crypto_key,
212  const std::string &cipher)
213 {
214  if (peers_.find(peer_id) != peers_.end()) {
215  peers_[peer_id]->setup_crypto(crypto_key, cipher);
216  }
217 }
218 
219 /** Register a new message type.
220  * @param full_name full name of type to register
221  * @return true if the type was successfully registered, false otherwise
222  */
223 bool
225 {
226  try {
227  message_register_->add_message_type(full_name);
228  return true;
229  } catch (std::runtime_error &e) {
230  //logger_->log_error("RefBox", "Registering type %s failed: %s", full_name.c_str(), e.what());
231  return false;
232  }
233 }
234 
235 /** Create a new message of given type.
236  * @param full_name name of message type (fully qualified, i.e. including package name)
237  * @return shared pointer to new mesage
238  * @exception std::runtime_error thrown if creating the message failed
239  */
240 std::shared_ptr<google::protobuf::Message> *
241 OpenPRSProtobuf::oprs_create_msg(std::string full_name)
242 {
243  std::shared_ptr<google::protobuf::Message> m = message_register_->new_message_for(full_name);
244  return new std::shared_ptr<google::protobuf::Message>(m);
245 }
246 
247 /** Create new reference to message.
248  * @param msgptr message to create reference for
249  * @return new message reference pointing to the very same message as @p msgptr
250  */
251 Term *
253 {
254  std::shared_ptr<google::protobuf::Message> *m =
255  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
256  if (!*m)
257  return build_pointer(new std::shared_ptr<google::protobuf::Message>());
258 
259  return build_pointer(new std::shared_ptr<google::protobuf::Message>(*m));
260 }
261 
262 /** Destroy given message (reference).
263  * This will decrement the reference count to the message and delete it.
264  * The message itself is deleted if the reference counter reaches zero.
265  * @param msgptr message (reference) to delete, any access to this message
266  * afterwards is illegal.
267  * @return T
268  */
269 Term *
271 {
272  std::shared_ptr<google::protobuf::Message> *m =
273  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
274  if (!*m)
275  return build_nil();
276 
277  delete m;
278  return build_t();
279 }
280 
281 /** Get field names of message.
282  * @param msgptr user pointer to message
283  * @return term containing lisp list of field names
284  */
285 Term *
287 {
288  TermList tl = sl_make_slist();
289 
290  std::shared_ptr<google::protobuf::Message> *m =
291  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
292  if (!*m)
293  return build_term_l_list_from_c_list(tl);
294 
295  const Descriptor *desc = (*m)->GetDescriptor();
296  const int field_count = desc->field_count();
297  for (int i = 0; i < field_count; ++i) {
298  tl = build_term_list(tl, build_string(desc->field(i)->name().c_str()));
299  }
300  return build_term_l_list_from_c_list(tl);
301 }
302 
303 /** Get type if a specific field.
304  * @param msgptr message for which to get the field type
305  * @param field_name name of the field
306  * @return term with a symbol for the type
307  */
308 Term *
309 OpenPRSProtobuf::oprs_pb_field_type(void *msgptr, std::string field_name)
310 {
311  std::shared_ptr<google::protobuf::Message> *m =
312  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
313  if (!*m)
314  return build_id(declare_atom("INVALID-MESSAGE"));
315 
316  const Descriptor * desc = (*m)->GetDescriptor();
317  const FieldDescriptor *field = desc->FindFieldByName(field_name);
318  if (!field) {
319  return build_id(declare_atom("DOES-NOT-EXIST"));
320  }
321  switch (field->type()) {
322  case FieldDescriptor::TYPE_DOUBLE: return build_id(declare_atom("DOUBLE"));
323  case FieldDescriptor::TYPE_FLOAT: return build_id(declare_atom("FLOAT"));
324  case FieldDescriptor::TYPE_INT64: return build_id(declare_atom("INT64"));
325  case FieldDescriptor::TYPE_UINT64: return build_id(declare_atom("UINT64"));
326  case FieldDescriptor::TYPE_INT32: return build_id(declare_atom("INT32"));
327  case FieldDescriptor::TYPE_FIXED64: return build_id(declare_atom("FIXED64"));
328  case FieldDescriptor::TYPE_FIXED32: return build_id(declare_atom("FIXED32"));
329  case FieldDescriptor::TYPE_BOOL: return build_id(declare_atom("BOOL"));
330  case FieldDescriptor::TYPE_STRING: return build_id(declare_atom("STRING"));
331  case FieldDescriptor::TYPE_MESSAGE: return build_id(declare_atom("MESSAGE"));
332  case FieldDescriptor::TYPE_BYTES: return build_id(declare_atom("BYTES"));
333  case FieldDescriptor::TYPE_UINT32: return build_id(declare_atom("UINT32"));
334  case FieldDescriptor::TYPE_ENUM: return build_id(declare_atom("ENUM"));
335  case FieldDescriptor::TYPE_SFIXED32: return build_id(declare_atom("SFIXED32"));
336  case FieldDescriptor::TYPE_SFIXED64: return build_id(declare_atom("SFIXED64"));
337  case FieldDescriptor::TYPE_SINT32: return build_id(declare_atom("SINT32"));
338  case FieldDescriptor::TYPE_SINT64: return build_id(declare_atom("SINT64"));
339  default: return build_id(declare_atom("UNKNOWN"));
340  }
341 }
342 
343 /** Check if message has a specific field.
344  * This is relevant in particular for optional fields.
345  * @param msgptr message
346  * @param field_name name of the field
347  * @return true if the field is present, false otherwise
348  */
349 bool
350 OpenPRSProtobuf::oprs_pb_has_field(void *msgptr, std::string field_name)
351 {
352  std::shared_ptr<google::protobuf::Message> *m =
353  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
354  if (!*m)
355  return false;
356 
357  const Descriptor * desc = (*m)->GetDescriptor();
358  const FieldDescriptor *field = desc->FindFieldByName(field_name);
359  if (!field)
360  return false;
361 
362  const Reflection *refl = (*m)->GetReflection();
363 
364  if (field->is_repeated()) {
365  return (refl->FieldSize(**m, field) > 0);
366  } else {
367  return refl->HasField(**m, field);
368  }
369 }
370 
371 /** Get a fields label.
372  * @param msgptr message for which to get the field type
373  * @param field_name name of the field
374  * @return Term with Symbol, one of INVALID-MESSAGE, DOES-NOT-EXIST, OPTIONAL, REPEATED, UNKNOWN
375  */
376 Term *
377 OpenPRSProtobuf::oprs_pb_field_label(void *msgptr, std::string field_name)
378 {
379  std::shared_ptr<google::protobuf::Message> *m =
380  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
381  if (!*m)
382  return build_id(declare_atom("INVALID-MESSAGE"));
383 
384  const Descriptor * desc = (*m)->GetDescriptor();
385  const FieldDescriptor *field = desc->FindFieldByName(field_name);
386  if (!field)
387  return build_id(declare_atom("DOES-NOT-EXIST"));
388  switch (field->label()) {
389  case FieldDescriptor::LABEL_OPTIONAL: return build_id(declare_atom("OPTIONAL"));
390  case FieldDescriptor::LABEL_REQUIRED: return build_id(declare_atom("REQUIRED"));
391  case FieldDescriptor::LABEL_REPEATED: return build_id(declare_atom("REPEATED"));
392  default: return build_id(declare_atom("UNKNOWN"));
393  }
394 }
395 
396 /** Get properly typed field value.
397  * @param msgptr message for which to get the field type
398  * @param field_name name of the field
399  * @return Term with value of proper type
400  */
401 Term *
402 OpenPRSProtobuf::oprs_pb_field_value(void *msgptr, std::string field_name)
403 {
404  std::shared_ptr<google::protobuf::Message> *m =
405  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
406  if (!*m)
407  return build_id(declare_atom("INVALID-MESSAGE"));
408 
409  const Descriptor * desc = (*m)->GetDescriptor();
410  const FieldDescriptor *field = desc->FindFieldByName(field_name);
411  if (!field)
412  return build_id(declare_atom("DOES-NOT-EXIST"));
413  const Reflection *refl = (*m)->GetReflection();
414  if (field->type() != FieldDescriptor::TYPE_MESSAGE && !refl->HasField(**m, field)) {
415  //logger_->log_warn("RefBox", "Field %s of %s not set",
416  // field_name.c_str(), (*m)->GetTypeName().c_str());
417  return build_id(declare_atom("NOT-SET"));
418  }
419  switch (field->type()) {
420  case FieldDescriptor::TYPE_DOUBLE: return build_float(refl->GetDouble(**m, field));
421  case FieldDescriptor::TYPE_FLOAT: return build_float(refl->GetFloat(**m, field));
422  case FieldDescriptor::TYPE_INT64: return build_long_long(refl->GetInt64(**m, field));
423  case FieldDescriptor::TYPE_UINT64: return build_long_long((long int)refl->GetUInt64(**m, field));
424  case FieldDescriptor::TYPE_INT32: return build_integer(refl->GetInt32(**m, field));
425  case FieldDescriptor::TYPE_FIXED64: return build_long_long((long int)refl->GetUInt64(**m, field));
426  case FieldDescriptor::TYPE_FIXED32: return build_long_long(refl->GetUInt32(**m, field));
427  case FieldDescriptor::TYPE_BOOL: return refl->GetBool(**m, field) ? build_t() : build_nil();
428  case FieldDescriptor::TYPE_STRING: return build_string(refl->GetString(**m, field).c_str());
429  case FieldDescriptor::TYPE_MESSAGE: {
430  const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
431  google::protobuf::Message * mcopy = mfield.New();
432  mcopy->CopyFrom(mfield);
433  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
434  return build_pointer(ptr);
435  }
436  case FieldDescriptor::TYPE_BYTES: return build_string((char *)"bytes");
437  case FieldDescriptor::TYPE_UINT32: return build_long_long(refl->GetUInt32(**m, field));
438  case FieldDescriptor::TYPE_ENUM:
439  return build_id(declare_atom(refl->GetEnum(**m, field)->name().c_str()));
440  case FieldDescriptor::TYPE_SFIXED32: return build_integer(refl->GetInt32(**m, field));
441  case FieldDescriptor::TYPE_SFIXED64: return build_long_long(refl->GetInt64(**m, field));
442  case FieldDescriptor::TYPE_SINT32: return build_integer(refl->GetInt32(**m, field));
443  case FieldDescriptor::TYPE_SINT64: return build_long_long(refl->GetInt64(**m, field));
444  default: throw std::logic_error("Unknown protobuf field type encountered");
445  }
446 }
447 
448 /** Set a field.
449  * @param msgptr message for which to get the field type
450  * @param field_name name of the field
451  * @param value term which must contain a single properly typed value.
452  */
453 void
454 OpenPRSProtobuf::oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
455 {
456  std::shared_ptr<google::protobuf::Message> *m =
457  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
458  if (!*m)
459  return;
460 
461  const Descriptor * desc = (*m)->GetDescriptor();
462  const FieldDescriptor *field = desc->FindFieldByName(field_name);
463  if (!field) {
464  //logger_->log_warn("RefBox", "Could not find field %s", field_name.c_str());
465  return;
466  }
467  const Reflection *refl = (*m)->GetReflection();
468 
469  try {
470  switch (field->type()) {
471  case FieldDescriptor::TYPE_DOUBLE:
472  if (value->type == TT_FLOAT) {
473  refl->SetDouble(m->get(), field, *(value->u.doubleptr));
474  } else {
475  throw std::logic_error(std::string("Invalid type, required float for ")
476  + (*m)->GetTypeName() + field_name);
477  }
478  break;
479  case FieldDescriptor::TYPE_FLOAT:
480  if (value->type == TT_FLOAT) {
481  refl->SetFloat(m->get(), field, *(value->u.doubleptr));
482  } else {
483  throw std::logic_error(std::string("Invalid type, required float for ")
484  + (*m)->GetTypeName() + field_name);
485  }
486  break;
487  case FieldDescriptor::TYPE_SFIXED64:
488  case FieldDescriptor::TYPE_SINT64:
489  case FieldDescriptor::TYPE_INT64:
490  if (value->type == INTEGER) {
491  refl->SetInt64(m->get(), field, value->u.intval);
492  } else if (value->type == LONG_LONG) {
493  refl->SetInt64(m->get(), field, value->u.llintval);
494  } else {
495  throw std::logic_error(std::string("Invalid type, required integer or long long for ")
496  + (*m)->GetTypeName() + field_name);
497  }
498  break;
499  case FieldDescriptor::TYPE_FIXED64:
500  case FieldDescriptor::TYPE_UINT64:
501  if (value->type == INTEGER) {
502  refl->SetUInt64(m->get(), field, value->u.intval);
503  } else if (value->type == LONG_LONG) {
504  refl->SetUInt64(m->get(), field, value->u.llintval);
505  } else {
506  throw std::logic_error(std::string("Invalid type, required integer or long long for ")
507  + (*m)->GetTypeName() + field_name);
508  }
509  break;
510  case FieldDescriptor::TYPE_SFIXED32:
511  case FieldDescriptor::TYPE_SINT32:
512  case FieldDescriptor::TYPE_INT32:
513  if (value->type == INTEGER) {
514  refl->SetInt32(m->get(), field, value->u.intval);
515  } else {
516  throw std::logic_error(std::string("Invalid type, required integer for ")
517  + (*m)->GetTypeName() + field_name);
518  }
519  break;
520  case FieldDescriptor::TYPE_BOOL:
521  if (value->type == TT_ATOM) {
522  if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
523  refl->SetBool(m->get(), field, (value->u.id == lisp_t_sym));
524  } else {
525  throw std::logic_error(std::string("Invalid value, allowed are T or NIL for field ")
526  + (*m)->GetTypeName() + field_name);
527  }
528  } else {
529  throw std::logic_error(std::string("Invalid type, required symbol for ")
530  + (*m)->GetTypeName() + field_name);
531  }
532  break;
533  case FieldDescriptor::TYPE_STRING:
534  if (value->type == STRING) {
535  refl->SetString(m->get(), field, value->u.string);
536  } else {
537  throw std::logic_error(std::string("Invalid type, required string for ")
538  + (*m)->GetTypeName() + field_name);
539  }
540  break;
541  case FieldDescriptor::TYPE_MESSAGE:
542  if (value->type == U_POINTER) {
543  std::shared_ptr<google::protobuf::Message> *mfrom =
544  static_cast<std::shared_ptr<google::protobuf::Message> *>(value->u.u_pointer);
545  Message *mut_msg = refl->MutableMessage(m->get(), field);
546  mut_msg->CopyFrom(**mfrom);
547  delete mfrom;
548  } else {
549  throw std::logic_error(std::string("Invalid type, required user pointer for ")
550  + (*m)->GetTypeName() + field_name);
551  }
552  break;
553  case FieldDescriptor::TYPE_BYTES: break;
554  case FieldDescriptor::TYPE_FIXED32:
555  case FieldDescriptor::TYPE_UINT32:
556  if (value->type == INTEGER) {
557  refl->SetUInt32(m->get(), field, value->u.intval);
558  } else if (value->type == LONG_LONG) {
559  refl->SetUInt32(m->get(), field, value->u.llintval);
560  } else {
561  throw std::logic_error(std::string("Invalid type, required integer or long long for ")
562  + (*m)->GetTypeName() + field_name);
563  }
564  break;
565  case FieldDescriptor::TYPE_ENUM: {
566  const char *sym_name = NULL;
567  if (value->type == TT_ATOM) {
568  sym_name = value->u.id;
569  } else if (value->type == STRING) {
570  sym_name = value->u.string;
571  } else {
572  throw std::logic_error(std::string("Invalid type, required symbol or string for ")
573  + (*m)->GetTypeName() + field_name);
574  }
575 
576  const EnumDescriptor * enumdesc = field->enum_type();
577  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
578  if (enumval) {
579  refl->SetEnum(m->get(), field, enumval);
580  } else {
581  std::string sym_str(sym_name);
582  std::transform(sym_str.begin(),
583  sym_str.end(),
584  sym_str.begin(),
585  std::ptr_fun<int, int>(std::toupper));
586 
587  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_str);
588 
589  if (enumval) {
590  refl->SetEnum(m->get(), field, enumval);
591  } else {
592  fprintf(stderr,
593  "%s: cannot set invalid enum value '%s' (neither '%s') on '%s'",
594  (*m)->GetTypeName().c_str(),
595  sym_name,
596  sym_str.c_str(),
597  field_name.c_str());
598  }
599  }
600  } break;
601 
602  default: throw std::logic_error("Unknown protobuf field type encountered");
603  }
604  } catch (std::logic_error &e) {
605  //logger_->log_warn("RefBox", "Failed to set field %s of %s: %s", field_name.c_str(),
606  // (*m)->GetTypeName().c_str(), e.what());
607  }
608 }
609 
610 /** Add value to a repeated field.
611  * @param msgptr message
612  * @param field_name name of the field
613  * @param value term which must contain a single properly typed value.
614  */
615 void
616 OpenPRSProtobuf::oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
617 {
618  std::shared_ptr<google::protobuf::Message> *m =
619  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
620  if (!(m && *m))
621  return;
622 
623  const Descriptor * desc = (*m)->GetDescriptor();
624  const FieldDescriptor *field = desc->FindFieldByName(field_name);
625  if (!field) {
626  //logger_->log_warn("RefBox", "Could not find field %s", field_name.c_str());
627  return;
628  }
629  const Reflection *refl = (*m)->GetReflection();
630 
631  try {
632  switch (field->type()) {
633  case FieldDescriptor::TYPE_DOUBLE:
634  if (value->type == TT_FLOAT) {
635  refl->AddDouble(m->get(), field, *(value->u.doubleptr));
636  } else {
637  throw std::logic_error(std::string("Invalid type, required float for ")
638  + (*m)->GetTypeName() + field_name);
639  }
640  break;
641  case FieldDescriptor::TYPE_FLOAT:
642  if (value->type == TT_FLOAT) {
643  refl->AddFloat(m->get(), field, *(value->u.doubleptr));
644  } else {
645  throw std::logic_error(std::string("Invalid type, required float for ")
646  + (*m)->GetTypeName() + field_name);
647  }
648  break;
649 
650  case FieldDescriptor::TYPE_SFIXED64:
651  case FieldDescriptor::TYPE_SINT64:
652  case FieldDescriptor::TYPE_INT64:
653  if (value->type == INTEGER) {
654  refl->AddInt64(m->get(), field, value->u.intval);
655  } else if (value->type == LONG_LONG) {
656  refl->AddInt64(m->get(), field, value->u.llintval);
657  } else {
658  throw std::logic_error(std::string("Invalid type, required integer or long long for ")
659  + (*m)->GetTypeName() + field_name);
660  }
661  break;
662 
663  case FieldDescriptor::TYPE_SFIXED32:
664  case FieldDescriptor::TYPE_SINT32:
665  case FieldDescriptor::TYPE_INT32:
666  if (value->type == INTEGER) {
667  refl->AddInt32(m->get(), field, value->u.intval);
668  } else {
669  throw std::logic_error(std::string("Invalid type, required integer for ")
670  + (*m)->GetTypeName() + field_name);
671  }
672  break;
673  case FieldDescriptor::TYPE_BOOL:
674  if (value->type == TT_ATOM) {
675  if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
676  refl->AddBool(m->get(), field, (value->u.id == lisp_t_sym));
677  } else {
678  throw std::logic_error(std::string("Invalid value, allowed are T or NIL for field ")
679  + (*m)->GetTypeName() + field_name);
680  }
681  } else {
682  throw std::logic_error(std::string("Invalid type, required symbol for ")
683  + (*m)->GetTypeName() + field_name);
684  }
685  break;
686  case FieldDescriptor::TYPE_STRING:
687  if (value->type == STRING) {
688  refl->AddString(m->get(), field, value->u.string);
689  } else {
690  throw std::logic_error(std::string("Invalid type, required string for ")
691  + (*m)->GetTypeName() + field_name);
692  }
693  break;
694  case FieldDescriptor::TYPE_MESSAGE:
695  if (value->type == U_POINTER) {
696  std::shared_ptr<google::protobuf::Message> *mfrom =
697  static_cast<std::shared_ptr<google::protobuf::Message> *>(value->u.u_pointer);
698  Message *mut_msg = refl->AddMessage(m->get(), field);
699  mut_msg->CopyFrom(**mfrom);
700  delete mfrom;
701  } else {
702  throw std::logic_error(std::string("Invalid type, required user pointer for ")
703  + (*m)->GetTypeName() + field_name);
704  }
705  break;
706 
707  case FieldDescriptor::TYPE_BYTES: break;
708 
709  case FieldDescriptor::TYPE_FIXED32:
710  case FieldDescriptor::TYPE_UINT32:
711  if (value->type == INTEGER) {
712  refl->AddUInt32(m->get(), field, value->u.intval);
713  } else if (value->type == LONG_LONG) {
714  refl->AddUInt32(m->get(), field, value->u.llintval);
715  } else {
716  throw std::logic_error(std::string("Invalid type, required integer or long long for ")
717  + (*m)->GetTypeName() + field_name);
718  }
719  break;
720 
721  case FieldDescriptor::TYPE_ENUM: {
722  const char *sym_name = NULL;
723  if (value->type == TT_ATOM) {
724  sym_name = value->u.id;
725  } else if (value->type == STRING) {
726  sym_name = value->u.string;
727  } else {
728  throw std::logic_error(std::string("Invalid type, required symbol or string for ")
729  + (*m)->GetTypeName() + field_name);
730  }
731  const EnumDescriptor * enumdesc = field->enum_type();
732  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
733  if (enumval) {
734  refl->AddEnum(m->get(), field, enumval);
735  } else {
736  //logger_->log_warn("RefBox", "%s: cannot set invalid enum value '%s' on '%s'",
737  // (*m)->GetTypeName().c_str(), value.as_string().c_str(), field_name.c_str());
738  }
739  } break;
740 
741  default: throw std::logic_error("Unknown protobuf field type encountered");
742  }
743  } catch (std::logic_error &e) {
744  //logger_->log_warn("RefBox", "Failed to add field %s of %s: %s", field_name.c_str(),
745  // (*m)->GetTypeName().c_str(), e.what());
746  }
747 }
748 
749 /** Connect as a client to the given server.
750  * Note that this will perform an asynchronous connect. A
751  * (protobuf-client-connected) or (protobuf-client-disconnected) fact
752  * is asserted during (pb-process) in the case of success or failure.
753  * @param host host to connect to
754  * @param port TCP port to connect to
755  * @return Term with a long long of the client ID
756  */
757 Term *
758 OpenPRSProtobuf::oprs_pb_client_connect(std::string host, int port)
759 {
760  if (port <= 0)
761  return build_nil();
762 
763  ProtobufStreamClient *client = new ProtobufStreamClient(&*message_register_);
764 
765  long int client_id;
766  {
767  fawkes::MutexLocker lock(&map_mutex_);
768  client_id = ++next_client_id_;
769  clients_[client_id] = client;
770  }
771 
772  client->signal_connected().connect(
773  boost::bind(&OpenPRSProtobuf::handle_client_connected, this, client_id));
774  client->signal_disconnected().connect(boost::bind(&OpenPRSProtobuf::handle_client_disconnected,
775  this,
776  client_id,
777  boost::asio::placeholders::error));
778  client->signal_received().connect(
779  boost::bind(&OpenPRSProtobuf::handle_client_msg, this, client_id, _1, _2, _3));
780  client->signal_receive_failed().connect(
781  boost::bind(&OpenPRSProtobuf::handle_client_receive_fail, this, client_id, _1, _2, _3));
782 
783  client->async_connect(host.c_str(), port);
784  return build_long_long(client_id);
785 }
786 
787 /** Send message to a specific client.
788  * @param client_id ID of the client, this can be a server client ID, a client
789  * ID, or a peer ID (message will then be broadcasted).
790  * @param msgptr message to send
791  */
792 void
793 OpenPRSProtobuf::oprs_pb_send(long int client_id, void *msgptr)
794 {
795  std::shared_ptr<google::protobuf::Message> *m =
796  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
797  if (!(m && *m)) {
798  //logger_->log_warn("RefBox", "Cannot send to %li: invalid message", client_id);
799  return;
800  }
801 
802  try {
803  fawkes::MutexLocker lock(&map_mutex_);
804 
805  if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
806  //printf("***** SENDING via SERVER\n");
807  server_->send(server_clients_[client_id], *m);
808  sig_server_sent_(server_clients_[client_id], *m);
809  } else if (clients_.find(client_id) != clients_.end()) {
810  //printf("***** SENDING via CLIENT\n");
811  clients_[client_id]->send(*m);
812  std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
813  sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
814  } else if (peers_.find(client_id) != peers_.end()) {
815  //printf("***** SENDING via CLIENT\n");
816  peers_[client_id]->send(*m);
817  sig_peer_sent_(client_id, *m);
818  } else {
819  //printf("Client ID %li is unknown, cannot send message of type %s\n",
820  // client_id, (*m)->GetTypeName().c_str());
821  }
822  } catch (google::protobuf::FatalException &e) {
823  //logger_->log_warn("RefBox", "Failed to send message of type %s: %s",
824  // (*m)->GetTypeName().c_str(), e.what());
825  } catch (std::runtime_error &e) {
826  //logger_->log_warn("RefBox", "Failed to send message of type %s: %s",
827  // (*m)->GetTypeName().c_str(), e.what());
828  }
829 }
830 
831 /** Broadcast a message through a peer.
832  * @param peer_id ID broadcast peer to send through
833  * @param msgptr message to send
834  */
835 void
836 OpenPRSProtobuf::oprs_pb_broadcast(long int peer_id, void *msgptr)
837 {
838  std::shared_ptr<google::protobuf::Message> *m =
839  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
840  if (!(m && *m)) {
841  fprintf(stderr, "Cannot send broadcast: invalid message");
842  return;
843  }
844 
845  fawkes::MutexLocker lock(&map_mutex_);
846  if (peers_.find(peer_id) == peers_.end())
847  return;
848 
849  try {
850  peers_[peer_id]->send(*m);
851  } catch (google::protobuf::FatalException &e) {
852  fprintf(stderr,
853  "pb-broadcast: failed to broadcast message of type %s: %s\n",
854  (*m)->GetTypeName().c_str(),
855  e.what());
856  }
857 
858  sig_peer_sent_(peer_id, *m);
859 }
860 
861 /** Disconnect a given client.
862  * @param client_id ID of client to disconnect, can be a server client ID or a client ID
863  */
864 void
866 {
867  //logger_->log_info("RefBox", "Disconnecting client %li", client_id);
868 
869  try {
870  fawkes::MutexLocker lock(&map_mutex_);
871 
872  if (server_clients_.find(client_id) != server_clients_.end()) {
873  protobuf_comm::ProtobufStreamServer::ClientID srv_client = server_clients_[client_id];
874  server_->disconnect(srv_client);
875  server_clients_.erase(client_id);
876  rev_server_clients_.erase(srv_client);
877  } else if (clients_.find(client_id) != clients_.end()) {
878  delete clients_[client_id];
879  clients_.erase(client_id);
880  }
881  } catch (std::runtime_error &e) {
882  throw fawkes::Exception("Failed to disconnect from client %li: %s", client_id, e.what());
883  }
884 }
885 
886 /** Get list of values of a given message field.
887  * @param msgptr message
888  * @param field_name field to retrieve
889  * @return term which contains a Lisp list with properly typed values, or a symbol in
890  * case of an error
891  */
892 Term *
893 OpenPRSProtobuf::oprs_pb_field_list(void *msgptr, std::string field_name)
894 {
895  std::shared_ptr<google::protobuf::Message> *m =
896  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
897  if (!(m && *m))
898  return build_id(declare_atom("INVALID-MESSAGE"));
899 
900  const Descriptor * desc = (*m)->GetDescriptor();
901  const FieldDescriptor *field = desc->FindFieldByName(field_name);
902  if (!field)
903  return build_id(declare_atom("DOES-NOT-EXIST"));
904 
905  TermList tl = sl_make_slist();
906 
907  if (field->label() == FieldDescriptor::LABEL_REQUIRED
908  || field->label() == FieldDescriptor::LABEL_OPTIONAL) {
909  tl = build_term_list(tl, oprs_pb_field_value(msgptr, field_name));
910  return build_term_l_list_from_c_list(tl);
911  }
912 
913  const Reflection *refl = (*m)->GetReflection();
914  int field_size = refl->FieldSize(**m, field);
915  for (int i = 0; i < field_size; ++i) {
916  switch (field->type()) {
917  case FieldDescriptor::TYPE_DOUBLE:
918  tl = build_term_list(tl, build_float(refl->GetRepeatedDouble(**m, field, i)));
919  break;
920  case FieldDescriptor::TYPE_FLOAT:
921  tl = build_term_list(tl, build_float(refl->GetRepeatedFloat(**m, field, i)));
922  break;
923  case FieldDescriptor::TYPE_UINT64:
924  case FieldDescriptor::TYPE_FIXED64:
925  tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt64(**m, field, i)));
926  break;
927  case FieldDescriptor::TYPE_UINT32:
928  case FieldDescriptor::TYPE_FIXED32:
929  tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt32(**m, field, i)));
930  break;
931  case FieldDescriptor::TYPE_BOOL:
932  tl = build_term_list(tl, refl->GetRepeatedBool(**m, field, i) ? build_t() : build_nil());
933  break;
934  case FieldDescriptor::TYPE_STRING:
935  tl = build_term_list(tl, build_string(refl->GetRepeatedString(**m, field, i).c_str()));
936  break;
937  case FieldDescriptor::TYPE_MESSAGE: {
938  const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
939  google::protobuf::Message * mcopy = msg.New();
940  mcopy->CopyFrom(msg);
941  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
942  tl = build_term_list(tl, build_pointer(ptr));
943  } break;
944  case FieldDescriptor::TYPE_BYTES:
945  tl = build_term_list(tl, build_string((char *)"bytes"));
946  break;
947  case FieldDescriptor::TYPE_ENUM:
948  tl = build_term_list(tl,
949  build_id(
950  declare_atom(refl->GetRepeatedEnum(**m, field, i)->name().c_str())));
951  break;
952  case FieldDescriptor::TYPE_SFIXED32:
953  case FieldDescriptor::TYPE_INT32:
954  case FieldDescriptor::TYPE_SINT32:
955  tl = build_term_list(tl, build_integer(refl->GetRepeatedInt32(**m, field, i)));
956  break;
957  case FieldDescriptor::TYPE_SFIXED64:
958  case FieldDescriptor::TYPE_SINT64:
959  case FieldDescriptor::TYPE_INT64:
960  tl = build_term_list(tl, build_long_long(refl->GetRepeatedInt64(**m, field, i)));
961  break;
962  default: throw std::logic_error("Unknown protobuf field type encountered");
963  }
964  }
965 
966  return build_term_l_list_from_c_list(tl);
967 }
968 
969 /** Check if a given field is a list (repeated field).
970  * @param msgptr message
971  * @param field_name name of the field
972  * @return true if the field is a list, false otherwise
973  */
974 bool
975 OpenPRSProtobuf::oprs_pb_field_is_list(void *msgptr, std::string field_name)
976 {
977  std::shared_ptr<google::protobuf::Message> *m =
978  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
979  if (!(m && *m))
980  return false;
981 
982  const Descriptor * desc = (*m)->GetDescriptor();
983  const FieldDescriptor *field = desc->FindFieldByName(field_name);
984  if (!field) {
985  return false;
986  }
987  return (field->label() == FieldDescriptor::LABEL_REPEATED);
988 }
989 
990 /** Process all pending events.
991  * This will process events and assert appropriate facts.
992  */
993 void
995 {
996  {
997  fawkes::MutexLocker lock(q_server_client_.mutex());
998  while (!q_server_client_.empty()) {
999  auto &sc = q_server_client_.front();
1000  oprs_assert_server_client_event(std::get<0>(sc),
1001  std::get<1>(sc),
1002  std::get<2>(sc),
1003  std::get<3>(sc));
1004  q_server_client_.pop();
1005  }
1006  }
1007 
1008  {
1009  fawkes::MutexLocker lock(q_client_.mutex());
1010  while (!q_client_.empty()) {
1011  auto &c = q_client_.front();
1012  oprs_assert_client_event(std::get<0>(c), std::get<1>(c));
1013  q_client_.pop();
1014  }
1015  }
1016 
1017  {
1018  fawkes::MutexLocker lock(q_msgs_.mutex());
1019  while (!q_msgs_.empty()) {
1020  auto &m = q_msgs_.front();
1021  oprs_assert_message(std::get<0>(m),
1022  std::get<1>(m),
1023  std::get<2>(m),
1024  std::get<3>(m),
1025  std::get<4>(m),
1026  std::get<5>(m),
1027  std::get<6>(m));
1028  q_msgs_.pop();
1029  }
1030  }
1031 }
1032 
1033 /** Check if there are pending events.
1034  * @return true if there are pending events, false otherwise
1035  */
1036 bool
1038 {
1039  fawkes::MutexLocker lock1(q_server_client_.mutex());
1040  fawkes::MutexLocker lock2(q_client_.mutex());
1041  fawkes::MutexLocker lock3(q_msgs_.mutex());
1042 
1043  return (!(q_server_client_.empty() && q_client_.empty() && q_msgs_.empty()));
1044 }
1045 
1046 void
1047 OpenPRSProtobuf::oprs_assert_server_client_event(long int client_id,
1048  std::string & host,
1049  unsigned short port,
1050  bool connect)
1051 {
1052  TermList tl = sl_make_slist();
1053  tl = build_term_list(tl, build_long_long(client_id));
1054  if (connect) {
1055  tl = build_term_list(tl, build_string(host.c_str()));
1056  tl = build_term_list(tl, build_integer(port));
1057  add_external_fact((char *)"protobuf-server-client-connected", tl);
1058  } else {
1059  add_external_fact((char *)"protobuf-server-client-disconnected", tl);
1060  }
1061 }
1062 
1063 void
1064 OpenPRSProtobuf::oprs_assert_client_event(long int client_id, bool connect)
1065 {
1066  TermList tl = sl_make_slist();
1067  tl = build_term_list(tl, build_long_long(client_id));
1068  if (connect) {
1069  add_external_fact((char *)"protobuf-client-connected", tl);
1070  } else {
1071  add_external_fact((char *)"protobuf-client-disconnected", tl);
1072  }
1073 }
1074 
1075 void
1076 OpenPRSProtobuf::oprs_assert_message(std::string & endpoint_host,
1077  unsigned short endpoint_port,
1078  uint16_t comp_id,
1079  uint16_t msg_type,
1080  std::shared_ptr<google::protobuf::Message> &msg,
1081  OpenPRSProtobuf::ClientType ct,
1082  unsigned int client_id)
1083 {
1084  TermList tl = sl_make_slist();
1085 
1086  struct timeval tv;
1087  gettimeofday(&tv, 0);
1088  void *ptr = new std::shared_ptr<google::protobuf::Message>(msg);
1089  //tl = build_term_list(tl, build_string((char *)"type"));
1090  tl = build_term_list(tl, build_string(msg->GetTypeName().c_str()));
1091  //tl = build_term_list(tl, build_string((char *)"comp-id"));
1092  tl = build_term_list(tl, build_integer(comp_id));
1093  //tl = build_term_list(tl, build_string((char *)"msg-type"));
1094  tl = build_term_list(tl, build_integer(msg_type));
1095  //tl = build_term_list(tl, build_string((char *)"rcvd-via"));
1096  tl = build_term_list(tl, build_string((client_id == 0) ? "BROADCAST" : "STREAM"));
1097  //tl = build_term_list(tl, build_string((char *)"rcvd-at"));
1098  tl = build_term_list(tl, build_long_long(tv.tv_sec));
1099  tl = build_term_list(tl, build_long_long(tv.tv_usec));
1100  //tl = build_term_list(tl, build_string((char *)"rcvd-from"));
1101  tl = build_term_list(tl, build_string(endpoint_host.c_str()));
1102  tl = build_term_list(tl, build_integer(endpoint_port));
1103  //tl = build_term_list(tl, build_string((char *)"client-type"));
1104  tl = build_term_list(tl,
1105  build_string(ct == CT_CLIENT ? "CLIENT"
1106  : (ct == CT_SERVER ? "SERVER" : "PEER")));
1107  //tl = build_term_list(tl, build_string((char *)"client-id"));
1108  tl = build_term_list(tl, build_integer(client_id));
1109  //tl = build_term_list(tl, build_string((char *)"ptr"));
1110  tl = build_term_list(tl, build_pointer(ptr));
1111 
1112  add_external_fact((char *)"protobuf-msg", tl);
1113 }
1114 
1115 void
1116 OpenPRSProtobuf::handle_server_client_connected(ProtobufStreamServer::ClientID client,
1117  boost::asio::ip::tcp::endpoint &endpoint)
1118 {
1119  long int client_id = -1;
1120  {
1121  fawkes::MutexLocker lock(&map_mutex_);
1122  client_id = ++next_client_id_;
1123  client_endpoints_[client_id] = std::make_pair(endpoint.address().to_string(), endpoint.port());
1124  server_clients_[client_id] = client;
1125  rev_server_clients_[client] = client_id;
1126  }
1127 
1128  q_server_client_.push_locked(
1129  std::make_tuple(client_id, endpoint.address().to_string(), endpoint.port(), true));
1130 }
1131 
1132 void
1133 OpenPRSProtobuf::handle_server_client_disconnected(ProtobufStreamServer::ClientID client,
1134  const boost::system::error_code &error)
1135 {
1136  long int client_id = -1;
1137  {
1138  fawkes::MutexLocker lock(&map_mutex_);
1139  RevServerClientMap::iterator c;
1140  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1141  client_id = c->second;
1142  rev_server_clients_.erase(c);
1143  server_clients_.erase(client_id);
1144  }
1145  }
1146 
1147  if (client_id >= 0) {
1148  q_server_client_.push_locked(std::make_tuple(client_id, "", 0, false));
1149  }
1150 }
1151 
1152 /** Handle message that came from a client.
1153  * @param client client ID
1154  * @param component_id component the message was addressed to
1155  * @param msg_type type of the message
1156  * @param msg the message
1157  */
1158 void
1159 OpenPRSProtobuf::handle_server_client_msg(ProtobufStreamServer::ClientID client,
1160  uint16_t component_id,
1161  uint16_t msg_type,
1162  std::shared_ptr<google::protobuf::Message> msg)
1163 {
1164  fawkes::MutexLocker lock(&map_mutex_);
1165  RevServerClientMap::iterator c;
1166  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1167  q_msgs_.push_locked(std::make_tuple(client_endpoints_[c->second].first,
1168  client_endpoints_[c->second].second,
1169  component_id,
1170  msg_type,
1171  msg,
1172  CT_SERVER,
1173  c->second));
1174  }
1175 }
1176 
1177 /** Handle server reception failure
1178  * @param client client ID
1179  * @param component_id component the message was addressed to
1180  * @param msg_type type of the message
1181  * @param msg the message string
1182  */
1183 void
1184 OpenPRSProtobuf::handle_server_client_fail(ProtobufStreamServer::ClientID client,
1185  uint16_t component_id,
1186  uint16_t msg_type,
1187  std::string msg)
1188 {
1189  fawkes::MutexLocker lock(&map_mutex_);
1190  RevServerClientMap::iterator c;
1191  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1192  /*
1193  fawkes::MutexLocker lock(&oprs_mutex_);
1194  oprs_->assert_fact_f("(protobuf-server-receive-failed (comp-id %u) (msg-type %u) "
1195  "(rcvd-via STREAM) (client-id %li) (message \"%s\") "
1196  "(rcvd-from (\"%s\" %u)))",
1197  component_id, msg_type, c->second, msg.c_str(),
1198  client_endpoints_[c->second].first.c_str(),
1199  client_endpoints_[c->second].second);
1200  */
1201  }
1202 }
1203 
1204 /** Handle message that came from a peer/robot
1205  * @param endpoint the endpoint from which the message was received
1206  * @param component_id component the message was addressed to
1207  * @param msg_type type of the message
1208  * @param msg the message
1209  */
1210 void
1211 OpenPRSProtobuf::handle_peer_msg(long int peer_id,
1212  boost::asio::ip::udp::endpoint & endpoint,
1213  uint16_t component_id,
1214  uint16_t msg_type,
1215  std::shared_ptr<google::protobuf::Message> msg)
1216 {
1217  q_msgs_.push_locked(std::make_tuple(endpoint.address().to_string(),
1218  endpoint.port(),
1219  component_id,
1220  msg_type,
1221  msg,
1222  CT_PEER,
1223  peer_id));
1224 }
1225 
1226 /** Handle error during peer message processing.
1227  * @param endpoint endpoint of incoming message
1228  * @param msg error message
1229  */
1230 void
1231 OpenPRSProtobuf::handle_peer_recv_error(long int peer_id,
1232  boost::asio::ip::udp::endpoint &endpoint,
1233  std::string msg)
1234 {
1235  fprintf(stderr,
1236  "Failed to receive peer message from %s:%u: %s\n",
1237  endpoint.address().to_string().c_str(),
1238  endpoint.port(),
1239  msg.c_str());
1240 }
1241 
1242 /** Handle error during peer message processing.
1243  * @param msg error message
1244  */
1245 void
1246 OpenPRSProtobuf::handle_peer_send_error(long int peer_id, const std::string &msg)
1247 {
1248  //logger_->log_warn("RefBox", "Failed to send peer message: %s", msg.c_str());
1249 }
1250 
1251 void
1252 OpenPRSProtobuf::handle_client_connected(long int client_id)
1253 {
1254  q_client_.push_locked(std::make_tuple(client_id, true));
1255 }
1256 
1257 void
1258 OpenPRSProtobuf::handle_client_disconnected(long int client_id,
1259  const boost::system::error_code &error)
1260 {
1261  q_client_.push_locked(std::make_tuple(client_id, false));
1262 }
1263 
1264 void
1265 OpenPRSProtobuf::handle_client_msg(long int client_id,
1266  uint16_t comp_id,
1267  uint16_t msg_type,
1268  std::shared_ptr<google::protobuf::Message> msg)
1269 {
1270  q_msgs_.push_locked(std::make_tuple("", 0, comp_id, msg_type, msg, CT_CLIENT, client_id));
1271 }
1272 
1273 void
1274 OpenPRSProtobuf::handle_client_receive_fail(long int client_id,
1275  uint16_t comp_id,
1276  uint16_t msg_type,
1277  const std::string &msg)
1278 {
1279  /*
1280  oprs_->assert_fact_f("(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) "
1281  "(comp-id %u) (msg-type %u) (message \"%s\"))",
1282  client_id, comp_id, msg_type, msg.c_str());
1283  */
1284 }
1285 
1286 } // namespace oprs_protobuf
protobuf_comm::ProtobufStreamClient::signal_disconnected
boost::signals2::signal< void(const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when the connection is closed.
Definition: client.h:130
protobuf_comm::ProtobufStreamClient::signal_receive_failed
boost::signals2::signal< void(uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Definition: client.h:112
oprs_protobuf::OpenPRSProtobuf::oprs_create_msg
std::shared_ptr< google::protobuf::Message > * oprs_create_msg(std::string full_name)
Create a new message of given type.
Definition: oprs_protobuf.cpp:241
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_create_local_crypto
Term * oprs_pb_peer_create_local_crypto(const std::string &host, int send_port, int recv_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Definition: oprs_protobuf.cpp:120
protobuf_comm::ProtobufStreamClient::signal_connected
boost::signals2::signal< void()> & signal_connected()
Signal that is invoked when the connection has been established.
Definition: client.h:121
fawkes::LockQueue::push_locked
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:141
protobuf_comm::MessageRegister
Definition: message_register.h:65
oprs_protobuf::OpenPRSProtobuf::oprs_pb_client_connect
Term * oprs_pb_client_connect(std::string host, int port)
Connect as a client to the given server.
Definition: oprs_protobuf.cpp:758
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_list
Term * oprs_pb_field_list(void *msgptr, std::string field_name)
Get list of values of a given message field.
Definition: oprs_protobuf.cpp:893
protobuf_comm::ProtobufBroadcastPeer::signal_recv_error
signal_recv_error_type & signal_recv_error()
Signal that is invoked when receiving a message failed.
Definition: peer.h:169
protobuf_comm::ProtobufStreamClient::async_connect
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
Definition: client.cpp:154
protobuf_comm::ProtobufStreamServer
Definition: server.h:61
fawkes::MutexLocker
Definition: mutex_locker.h:39
oprs_protobuf::OpenPRSProtobuf::oprs_pb_events_pending
bool oprs_pb_events_pending()
Check if there are pending events.
Definition: oprs_protobuf.cpp:1037
oprs_protobuf::OpenPRSProtobuf::oprs_pb_set_field
void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
Set a field.
Definition: oprs_protobuf.cpp:454
protobuf_comm::ProtobufStreamServer::ClientID
unsigned int ClientID
ID to identify connected clients.
Definition: server.h:65
protobuf_comm::ProtobufStreamServer::signal_disconnected
boost::signals2::signal< void(ClientID, const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when a new client has disconnected.
Definition: server.h:131
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_destroy
void oprs_pb_peer_destroy(long int peer_id)
Disable peer.
Definition: oprs_protobuf.cpp:196
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_create_crypto
Term * oprs_pb_peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Definition: oprs_protobuf.cpp:161
oprs_protobuf::OpenPRSProtobuf::oprs_pb_process
void oprs_pb_process()
Process all pending events.
Definition: oprs_protobuf.cpp:994
protobuf_comm::ProtobufStreamClient
Definition: client.h:60
protobuf_comm::ProtobufStreamClient::signal_received
boost::signals2::signal< void(uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
Definition: client.h:103
oprs_protobuf::OpenPRSProtobuf::oprs_pb_has_field
bool oprs_pb_has_field(void *msgptr, std::string field_name)
Check if message has a specific field.
Definition: oprs_protobuf.cpp:350
oprs_protobuf::OpenPRSProtobuf::oprs_pb_register_type
bool oprs_pb_register_type(std::string full_name)
Register a new message type.
Definition: oprs_protobuf.cpp:224
protobuf_comm::ProtobufBroadcastPeer::signal_received
signal_received_type & signal_received()
Signal that is invoked when a message has been received.
Definition: peer.h:149
oprs_protobuf::OpenPRSProtobuf::oprs_pb_ref
Term * oprs_pb_ref(void *msgptr)
Create new reference to message.
Definition: oprs_protobuf.cpp:252
oprs_protobuf::OpenPRSProtobuf::oprs_pb_disable_server
void oprs_pb_disable_server()
Disable protobuf stream server.
Definition: oprs_protobuf.cpp:105
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_setup_crypto
void oprs_pb_peer_setup_crypto(long int peer_id, const std::string &crypto_key, const std::string &cipher)
Setup crypto for peer.
Definition: oprs_protobuf.cpp:210
protobuf_comm::ProtobufStreamServer::signal_connected
boost::signals2::signal< void(ClientID, boost::asio::ip::tcp::endpoint &)> & signal_connected()
Signal that is invoked when a new client has connected.
Definition: server.h:122
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_value
Term * oprs_pb_field_value(void *msgptr, std::string field_name)
Get properly typed field value.
Definition: oprs_protobuf.cpp:402
protobuf_comm::ProtobufBroadcastPeer
Definition: peer.h:61
oprs_protobuf::OpenPRSProtobuf::oprs_pb_add_list
void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
Add value to a repeated field.
Definition: oprs_protobuf.cpp:616
protobuf_comm::ProtobufStreamServer::signal_receive_failed
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Definition: server.h:113
protobuf_comm::ProtobufStreamServer::disconnect
void disconnect(ClientID client)
Disconnect specific client.
Definition: server.cpp:447
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_is_list
bool oprs_pb_field_is_list(void *msgptr, std::string field_name)
Check if a given field is a list (repeated field).
Definition: oprs_protobuf.cpp:975
protobuf_comm::ProtobufStreamServer::signal_received
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
Definition: server.h:104
protobuf_comm::ProtobufBroadcastPeer::signal_send_error
signal_send_error_type & signal_send_error()
Signal that is invoked when sending a message failed.
Definition: peer.h:178
fawkes::LockQueue::mutex
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_queue.h:80
oprs_protobuf::OpenPRSProtobuf::oprs_pb_broadcast
void oprs_pb_broadcast(long int peer_id, void *msgptr)
Broadcast a message through a peer.
Definition: oprs_protobuf.cpp:836
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_type
Term * oprs_pb_field_type(void *msgptr, std::string field_name)
Get type if a specific field.
Definition: oprs_protobuf.cpp:309
oprs_protobuf::OpenPRSProtobuf::oprs_pb_destroy
Term * oprs_pb_destroy(void *msgptr)
Destroy given message (reference).
Definition: oprs_protobuf.cpp:270
oprs_protobuf::OpenPRSProtobuf::oprs_pb_send
void oprs_pb_send(long int client_id, void *msgptr)
Send message to a specific client.
Definition: oprs_protobuf.cpp:793
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_create_local
Term * oprs_pb_peer_create_local(const std::string &host, int send_port, int recv_port)
Enable protobuf peer.
Definition: oprs_protobuf.cpp:187
oprs_protobuf::OpenPRSProtobuf::oprs_pb_disconnect
void oprs_pb_disconnect(long int client_id)
Disconnect a given client.
Definition: oprs_protobuf.cpp:865
oprs_protobuf::OpenPRSProtobuf::~OpenPRSProtobuf
~OpenPRSProtobuf()
Destructor.
Definition: oprs_protobuf.cpp:72
protobuf_comm::ProtobufStreamServer::send
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
Definition: server.cpp:319
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_names
Term * oprs_pb_field_names(void *msgptr)
Get field names of message.
Definition: oprs_protobuf.cpp:286
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_create
Term * oprs_pb_peer_create(const std::string &host, int port)
Enable protobuf peer.
Definition: oprs_protobuf.cpp:175
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_label
Term * oprs_pb_field_label(void *msgptr, std::string field_name)
Get a fields label.
Definition: oprs_protobuf.cpp:377
oprs_protobuf::OpenPRSProtobuf::oprs_pb_enable_server
void oprs_pb_enable_server(int port)
Enable protobuf stream server.
Definition: oprs_protobuf.cpp:87
fawkes::Exception
Definition: exception.h:41