Fawkes API Fawkes Development Version
oprs_protobuf.cpp
1
2/***************************************************************************
3 * oprs_protobuf.cpp - protobuf network communication for OpenPRS
4 *
5 * Created: Tue Sep 02 16:53:26 2014 (based on CLIPS version)
6 * Copyright 2013-2014 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 *
13 * - Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the
18 * distribution.
19 * - Neither the name of the authors nor the names of its contributors
20 * may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27 * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34 * OF THE POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include "oprs_protobuf.h"
38
39#include <core/exception.h>
40#include <core/threading/mutex_locker.h>
41#include <google/protobuf/descriptor.h>
42#include <protobuf_comm/client.h>
43#include <protobuf_comm/peer.h>
44#include <protobuf_comm/server.h>
45
46#include <algorithm>
47#include <oprs_f-pub.h>
48
49using namespace google::protobuf;
50using namespace protobuf_comm;
51
52namespace oprs_protobuf {
53
54/** @class OpenPRSProtobuf "oprs_protobuf.h"
55 * OpenPRS protobuf integration class.
56 * This class adds functionality related to protobuf to OpenPRS.
57 * It supports the creation of communication channels through protobuf_comm.
58 * An instance maintains its own message register shared among server, peer,
59 * and clients.
60 * @author Tim Niemueller
61 */
62
63/** Constructor.
64 * @param proto_path proto path passed to a newly instantiated message register
65 */
66OpenPRSProtobuf::OpenPRSProtobuf(std::vector<std::string> &proto_path)
67: message_register_(new MessageRegister(proto_path)), server_(NULL), next_client_id_(0)
68{
69}
70
71/** Destructor. */
73{
74 for (auto c : clients_) {
75 delete c.second;
76 }
77 clients_.clear();
78
79 delete server_;
80 message_register_.reset();
81}
82
83/** Enable protobuf stream server.
84 * @param port TCP port to listen on for connections
85 */
86void
88{
89 if ((port > 0) && !server_) {
90 server_ = new protobuf_comm::ProtobufStreamServer(port, &*message_register_);
91
92 server_->signal_connected().connect(
93 boost::bind(&OpenPRSProtobuf::handle_server_client_connected, this, _1, _2));
94 server_->signal_disconnected().connect(
95 boost::bind(&OpenPRSProtobuf::handle_server_client_disconnected, this, _1, _2));
96 server_->signal_received().connect(
97 boost::bind(&OpenPRSProtobuf::handle_server_client_msg, this, _1, _2, _3, _4));
98 server_->signal_receive_failed().connect(
99 boost::bind(&OpenPRSProtobuf::handle_server_client_fail, this, _1, _2, _3, _4));
100 }
101}
102
103/** Disable protobuf stream server. */
104void
106{
107 delete server_;
108 server_ = NULL;
109}
110
111/** Enable protobuf peer.
112 * @param address IP address to send messages to
113 * @param send_port UDP port to send messages to
114 * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
115 * @param crypto_key encryption key
116 * @param cipher cipher suite, see BufferEncryptor for supported types
117 * @return peer identifier
118 */
119Term *
121 int send_port,
122 int recv_port,
123 const std::string &crypto_key,
124 const std::string &cipher)
125{
126 if (recv_port <= 0)
127 recv_port = send_port;
128
129 if (send_port > 0) {
130 protobuf_comm::ProtobufBroadcastPeer *peer = new protobuf_comm::ProtobufBroadcastPeer(
131 address, send_port, recv_port, &*message_register_, crypto_key, cipher);
132
133 long int peer_id;
134 {
135 fawkes::MutexLocker lock(&map_mutex_);
136 peer_id = ++next_client_id_;
137 peers_[peer_id] = peer;
138 }
139
140 peer->signal_received().connect(
141 boost::bind(&OpenPRSProtobuf::handle_peer_msg, this, peer_id, _1, _2, _3, _4));
142 peer->signal_recv_error().connect(
143 boost::bind(&OpenPRSProtobuf::handle_peer_recv_error, this, peer_id, _1, _2));
144 peer->signal_send_error().connect(
145 boost::bind(&OpenPRSProtobuf::handle_peer_send_error, this, peer_id, _1));
146
147 return build_long_long(peer_id);
148 } else {
149 return build_long_long(0);
150 }
151}
152
153/** Enable protobuf peer.
154 * @param address IP address to send messages to
155 * @param port UDP port to send and receive messages
156 * @param crypto_key encryption key
157 * @param cipher cipher suite, see BufferEncryptor for supported types
158 * @return peer identifier
159 */
160Term *
162 int port,
163 const std::string &crypto_key,
164 const std::string &cipher)
165{
166 return oprs_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
167}
168
169/** Enable protobuf peer.
170 * @param address IP address to send messages to
171 * @param port UDP port to send and receive messages
172 * @return peer identifier
173 */
174Term *
175OpenPRSProtobuf::oprs_pb_peer_create(const std::string &address, int port)
176{
177 return oprs_pb_peer_create_local_crypto(address, port, port);
178}
179
180/** Enable protobuf peer.
181 * @param address IP address to send messages to
182 * @param send_port UDP port to send messages to
183 * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
184 * @return peer identifier
185 */
186Term *
187OpenPRSProtobuf::oprs_pb_peer_create_local(const std::string &address, int send_port, int recv_port)
188{
189 return oprs_pb_peer_create_local_crypto(address, send_port, recv_port);
190}
191
192/** Disable peer.
193 * @param peer_id ID of the peer to destroy
194 */
195void
197{
198 if (peers_.find(peer_id) != peers_.end()) {
199 delete peers_[peer_id];
200 peers_.erase(peer_id);
201 }
202}
203
204/** Setup crypto for peer.
205 * @param peer_id ID of the peer to destroy
206 * @param crypto_key encryption key
207 * @param cipher cipher suite, see BufferEncryptor for supported types
208 */
209void
211 const std::string &crypto_key,
212 const std::string &cipher)
213{
214 if (peers_.find(peer_id) != peers_.end()) {
215 peers_[peer_id]->setup_crypto(crypto_key, cipher);
216 }
217}
218
219/** Register a new message type.
220 * @param full_name full name of type to register
221 * @return true if the type was successfully registered, false otherwise
222 */
223bool
225{
226 try {
227 message_register_->add_message_type(full_name);
228 return true;
229 } catch (std::runtime_error &e) {
230 //logger_->log_error("RefBox", "Registering type %s failed: %s", full_name.c_str(), e.what());
231 return false;
232 }
233}
234
235/** Create a new message of given type.
236 * @param full_name name of message type (fully qualified, i.e. including package name)
237 * @return shared pointer to new mesage
238 * @exception std::runtime_error thrown if creating the message failed
239 */
240std::shared_ptr<google::protobuf::Message> *
242{
243 std::shared_ptr<google::protobuf::Message> m = message_register_->new_message_for(full_name);
244 return new std::shared_ptr<google::protobuf::Message>(m);
245}
246
247/** Create new reference to message.
248 * @param msgptr message to create reference for
249 * @return new message reference pointing to the very same message as @p msgptr
250 */
251Term *
253{
254 std::shared_ptr<google::protobuf::Message> *m =
255 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
256 if (!*m)
257 return build_pointer(new std::shared_ptr<google::protobuf::Message>());
258
259 return build_pointer(new std::shared_ptr<google::protobuf::Message>(*m));
260}
261
262/** Destroy given message (reference).
263 * This will decrement the reference count to the message and delete it.
264 * The message itself is deleted if the reference counter reaches zero.
265 * @param msgptr message (reference) to delete, any access to this message
266 * afterwards is illegal.
267 * @return T
268 */
269Term *
271{
272 std::shared_ptr<google::protobuf::Message> *m =
273 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
274 if (!*m)
275 return build_nil();
276
277 delete m;
278 return build_t();
279}
280
281/** Get field names of message.
282 * @param msgptr user pointer to message
283 * @return term containing lisp list of field names
284 */
285Term *
287{
288 TermList tl = sl_make_slist();
289
290 std::shared_ptr<google::protobuf::Message> *m =
291 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
292 if (!*m)
293 return build_term_l_list_from_c_list(tl);
294
295 const Descriptor *desc = (*m)->GetDescriptor();
296 const int field_count = desc->field_count();
297 for (int i = 0; i < field_count; ++i) {
298 tl = build_term_list(tl, build_string(desc->field(i)->name().c_str()));
299 }
300 return build_term_l_list_from_c_list(tl);
301}
302
303/** Get type if a specific field.
304 * @param msgptr message for which to get the field type
305 * @param field_name name of the field
306 * @return term with a symbol for the type
307 */
308Term *
309OpenPRSProtobuf::oprs_pb_field_type(void *msgptr, std::string field_name)
310{
311 std::shared_ptr<google::protobuf::Message> *m =
312 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
313 if (!*m)
314 return build_id(declare_atom("INVALID-MESSAGE"));
315
316 const Descriptor * desc = (*m)->GetDescriptor();
317 const FieldDescriptor *field = desc->FindFieldByName(field_name);
318 if (!field) {
319 return build_id(declare_atom("DOES-NOT-EXIST"));
320 }
321 switch (field->type()) {
322 case FieldDescriptor::TYPE_DOUBLE: return build_id(declare_atom("DOUBLE"));
323 case FieldDescriptor::TYPE_FLOAT: return build_id(declare_atom("FLOAT"));
324 case FieldDescriptor::TYPE_INT64: return build_id(declare_atom("INT64"));
325 case FieldDescriptor::TYPE_UINT64: return build_id(declare_atom("UINT64"));
326 case FieldDescriptor::TYPE_INT32: return build_id(declare_atom("INT32"));
327 case FieldDescriptor::TYPE_FIXED64: return build_id(declare_atom("FIXED64"));
328 case FieldDescriptor::TYPE_FIXED32: return build_id(declare_atom("FIXED32"));
329 case FieldDescriptor::TYPE_BOOL: return build_id(declare_atom("BOOL"));
330 case FieldDescriptor::TYPE_STRING: return build_id(declare_atom("STRING"));
331 case FieldDescriptor::TYPE_MESSAGE: return build_id(declare_atom("MESSAGE"));
332 case FieldDescriptor::TYPE_BYTES: return build_id(declare_atom("BYTES"));
333 case FieldDescriptor::TYPE_UINT32: return build_id(declare_atom("UINT32"));
334 case FieldDescriptor::TYPE_ENUM: return build_id(declare_atom("ENUM"));
335 case FieldDescriptor::TYPE_SFIXED32: return build_id(declare_atom("SFIXED32"));
336 case FieldDescriptor::TYPE_SFIXED64: return build_id(declare_atom("SFIXED64"));
337 case FieldDescriptor::TYPE_SINT32: return build_id(declare_atom("SINT32"));
338 case FieldDescriptor::TYPE_SINT64: return build_id(declare_atom("SINT64"));
339 default: return build_id(declare_atom("UNKNOWN"));
340 }
341}
342
343/** Check if message has a specific field.
344 * This is relevant in particular for optional fields.
345 * @param msgptr message
346 * @param field_name name of the field
347 * @return true if the field is present, false otherwise
348 */
349bool
350OpenPRSProtobuf::oprs_pb_has_field(void *msgptr, std::string field_name)
351{
352 std::shared_ptr<google::protobuf::Message> *m =
353 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
354 if (!*m)
355 return false;
356
357 const Descriptor * desc = (*m)->GetDescriptor();
358 const FieldDescriptor *field = desc->FindFieldByName(field_name);
359 if (!field)
360 return false;
361
362 const Reflection *refl = (*m)->GetReflection();
363
364 if (field->is_repeated()) {
365 return (refl->FieldSize(**m, field) > 0);
366 } else {
367 return refl->HasField(**m, field);
368 }
369}
370
371/** Get a fields label.
372 * @param msgptr message for which to get the field type
373 * @param field_name name of the field
374 * @return Term with Symbol, one of INVALID-MESSAGE, DOES-NOT-EXIST, OPTIONAL, REPEATED, UNKNOWN
375 */
376Term *
377OpenPRSProtobuf::oprs_pb_field_label(void *msgptr, std::string field_name)
378{
379 std::shared_ptr<google::protobuf::Message> *m =
380 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
381 if (!*m)
382 return build_id(declare_atom("INVALID-MESSAGE"));
383
384 const Descriptor * desc = (*m)->GetDescriptor();
385 const FieldDescriptor *field = desc->FindFieldByName(field_name);
386 if (!field)
387 return build_id(declare_atom("DOES-NOT-EXIST"));
388 switch (field->label()) {
389 case FieldDescriptor::LABEL_OPTIONAL: return build_id(declare_atom("OPTIONAL"));
390 case FieldDescriptor::LABEL_REQUIRED: return build_id(declare_atom("REQUIRED"));
391 case FieldDescriptor::LABEL_REPEATED: return build_id(declare_atom("REPEATED"));
392 default: return build_id(declare_atom("UNKNOWN"));
393 }
394}
395
396/** Get properly typed field value.
397 * @param msgptr message for which to get the field type
398 * @param field_name name of the field
399 * @return Term with value of proper type
400 */
401Term *
402OpenPRSProtobuf::oprs_pb_field_value(void *msgptr, std::string field_name)
403{
404 std::shared_ptr<google::protobuf::Message> *m =
405 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
406 if (!*m)
407 return build_id(declare_atom("INVALID-MESSAGE"));
408
409 const Descriptor * desc = (*m)->GetDescriptor();
410 const FieldDescriptor *field = desc->FindFieldByName(field_name);
411 if (!field)
412 return build_id(declare_atom("DOES-NOT-EXIST"));
413 const Reflection *refl = (*m)->GetReflection();
414 if (field->type() != FieldDescriptor::TYPE_MESSAGE && !refl->HasField(**m, field)) {
415 //logger_->log_warn("RefBox", "Field %s of %s not set",
416 // field_name.c_str(), (*m)->GetTypeName().c_str());
417 return build_id(declare_atom("NOT-SET"));
418 }
419 switch (field->type()) {
420 case FieldDescriptor::TYPE_DOUBLE: return build_float(refl->GetDouble(**m, field));
421 case FieldDescriptor::TYPE_FLOAT: return build_float(refl->GetFloat(**m, field));
422 case FieldDescriptor::TYPE_INT64: return build_long_long(refl->GetInt64(**m, field));
423 case FieldDescriptor::TYPE_UINT64: return build_long_long((long int)refl->GetUInt64(**m, field));
424 case FieldDescriptor::TYPE_INT32: return build_integer(refl->GetInt32(**m, field));
425 case FieldDescriptor::TYPE_FIXED64: return build_long_long((long int)refl->GetUInt64(**m, field));
426 case FieldDescriptor::TYPE_FIXED32: return build_long_long(refl->GetUInt32(**m, field));
427 case FieldDescriptor::TYPE_BOOL: return refl->GetBool(**m, field) ? build_t() : build_nil();
428 case FieldDescriptor::TYPE_STRING: return build_string(refl->GetString(**m, field).c_str());
429 case FieldDescriptor::TYPE_MESSAGE: {
430 const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
431 google::protobuf::Message * mcopy = mfield.New();
432 mcopy->CopyFrom(mfield);
433 void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
434 return build_pointer(ptr);
435 }
436 case FieldDescriptor::TYPE_BYTES: return build_string((char *)"bytes");
437 case FieldDescriptor::TYPE_UINT32: return build_long_long(refl->GetUInt32(**m, field));
438 case FieldDescriptor::TYPE_ENUM:
439 return build_id(declare_atom(refl->GetEnum(**m, field)->name().c_str()));
440 case FieldDescriptor::TYPE_SFIXED32: return build_integer(refl->GetInt32(**m, field));
441 case FieldDescriptor::TYPE_SFIXED64: return build_long_long(refl->GetInt64(**m, field));
442 case FieldDescriptor::TYPE_SINT32: return build_integer(refl->GetInt32(**m, field));
443 case FieldDescriptor::TYPE_SINT64: return build_long_long(refl->GetInt64(**m, field));
444 default: throw std::logic_error("Unknown protobuf field type encountered");
445 }
446}
447
448/** Set a field.
449 * @param msgptr message for which to get the field type
450 * @param field_name name of the field
451 * @param value term which must contain a single properly typed value.
452 */
453void
454OpenPRSProtobuf::oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
455{
456 std::shared_ptr<google::protobuf::Message> *m =
457 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
458 if (!*m)
459 return;
460
461 const Descriptor * desc = (*m)->GetDescriptor();
462 const FieldDescriptor *field = desc->FindFieldByName(field_name);
463 if (!field) {
464 //logger_->log_warn("RefBox", "Could not find field %s", field_name.c_str());
465 return;
466 }
467 const Reflection *refl = (*m)->GetReflection();
468
469 try {
470 switch (field->type()) {
471 case FieldDescriptor::TYPE_DOUBLE:
472 if (value->type == TT_FLOAT) {
473 refl->SetDouble(m->get(), field, *(value->u.doubleptr));
474 } else {
475 throw std::logic_error(std::string("Invalid type, required float for ")
476 + (*m)->GetTypeName() + field_name);
477 }
478 break;
479 case FieldDescriptor::TYPE_FLOAT:
480 if (value->type == TT_FLOAT) {
481 refl->SetFloat(m->get(), field, *(value->u.doubleptr));
482 } else {
483 throw std::logic_error(std::string("Invalid type, required float for ")
484 + (*m)->GetTypeName() + field_name);
485 }
486 break;
487 case FieldDescriptor::TYPE_SFIXED64:
488 case FieldDescriptor::TYPE_SINT64:
489 case FieldDescriptor::TYPE_INT64:
490 if (value->type == INTEGER) {
491 refl->SetInt64(m->get(), field, value->u.intval);
492 } else if (value->type == LONG_LONG) {
493 refl->SetInt64(m->get(), field, value->u.llintval);
494 } else {
495 throw std::logic_error(std::string("Invalid type, required integer or long long for ")
496 + (*m)->GetTypeName() + field_name);
497 }
498 break;
499 case FieldDescriptor::TYPE_FIXED64:
500 case FieldDescriptor::TYPE_UINT64:
501 if (value->type == INTEGER) {
502 refl->SetUInt64(m->get(), field, value->u.intval);
503 } else if (value->type == LONG_LONG) {
504 refl->SetUInt64(m->get(), field, value->u.llintval);
505 } else {
506 throw std::logic_error(std::string("Invalid type, required integer or long long for ")
507 + (*m)->GetTypeName() + field_name);
508 }
509 break;
510 case FieldDescriptor::TYPE_SFIXED32:
511 case FieldDescriptor::TYPE_SINT32:
512 case FieldDescriptor::TYPE_INT32:
513 if (value->type == INTEGER) {
514 refl->SetInt32(m->get(), field, value->u.intval);
515 } else {
516 throw std::logic_error(std::string("Invalid type, required integer for ")
517 + (*m)->GetTypeName() + field_name);
518 }
519 break;
520 case FieldDescriptor::TYPE_BOOL:
521 if (value->type == TT_ATOM) {
522 if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
523 refl->SetBool(m->get(), field, (value->u.id == lisp_t_sym));
524 } else {
525 throw std::logic_error(std::string("Invalid value, allowed are T or NIL for field ")
526 + (*m)->GetTypeName() + field_name);
527 }
528 } else {
529 throw std::logic_error(std::string("Invalid type, required symbol for ")
530 + (*m)->GetTypeName() + field_name);
531 }
532 break;
533 case FieldDescriptor::TYPE_STRING:
534 if (value->type == STRING) {
535 refl->SetString(m->get(), field, value->u.string);
536 } else {
537 throw std::logic_error(std::string("Invalid type, required string for ")
538 + (*m)->GetTypeName() + field_name);
539 }
540 break;
541 case FieldDescriptor::TYPE_MESSAGE:
542 if (value->type == U_POINTER) {
543 std::shared_ptr<google::protobuf::Message> *mfrom =
544 static_cast<std::shared_ptr<google::protobuf::Message> *>(value->u.u_pointer);
545 Message *mut_msg = refl->MutableMessage(m->get(), field);
546 mut_msg->CopyFrom(**mfrom);
547 delete mfrom;
548 } else {
549 throw std::logic_error(std::string("Invalid type, required user pointer for ")
550 + (*m)->GetTypeName() + field_name);
551 }
552 break;
553 case FieldDescriptor::TYPE_BYTES: break;
554 case FieldDescriptor::TYPE_FIXED32:
555 case FieldDescriptor::TYPE_UINT32:
556 if (value->type == INTEGER) {
557 refl->SetUInt32(m->get(), field, value->u.intval);
558 } else if (value->type == LONG_LONG) {
559 refl->SetUInt32(m->get(), field, value->u.llintval);
560 } else {
561 throw std::logic_error(std::string("Invalid type, required integer or long long for ")
562 + (*m)->GetTypeName() + field_name);
563 }
564 break;
565 case FieldDescriptor::TYPE_ENUM: {
566 const char *sym_name = NULL;
567 if (value->type == TT_ATOM) {
568 sym_name = value->u.id;
569 } else if (value->type == STRING) {
570 sym_name = value->u.string;
571 } else {
572 throw std::logic_error(std::string("Invalid type, required symbol or string for ")
573 + (*m)->GetTypeName() + field_name);
574 }
575
576 const EnumDescriptor * enumdesc = field->enum_type();
577 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
578 if (enumval) {
579 refl->SetEnum(m->get(), field, enumval);
580 } else {
581 std::string sym_str(sym_name);
582 std::transform(sym_str.begin(),
583 sym_str.end(),
584 sym_str.begin(),
585 std::ptr_fun<int, int>(std::toupper));
586
587 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_str);
588
589 if (enumval) {
590 refl->SetEnum(m->get(), field, enumval);
591 } else {
592 fprintf(stderr,
593 "%s: cannot set invalid enum value '%s' (neither '%s') on '%s'",
594 (*m)->GetTypeName().c_str(),
595 sym_name,
596 sym_str.c_str(),
597 field_name.c_str());
598 }
599 }
600 } break;
601
602 default: throw std::logic_error("Unknown protobuf field type encountered");
603 }
604 } catch (std::logic_error &e) {
605 //logger_->log_warn("RefBox", "Failed to set field %s of %s: %s", field_name.c_str(),
606 // (*m)->GetTypeName().c_str(), e.what());
607 }
608}
609
610/** Add value to a repeated field.
611 * @param msgptr message
612 * @param field_name name of the field
613 * @param value term which must contain a single properly typed value.
614 */
615void
616OpenPRSProtobuf::oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
617{
618 std::shared_ptr<google::protobuf::Message> *m =
619 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
620 if (!(m && *m))
621 return;
622
623 const Descriptor * desc = (*m)->GetDescriptor();
624 const FieldDescriptor *field = desc->FindFieldByName(field_name);
625 if (!field) {
626 //logger_->log_warn("RefBox", "Could not find field %s", field_name.c_str());
627 return;
628 }
629 const Reflection *refl = (*m)->GetReflection();
630
631 try {
632 switch (field->type()) {
633 case FieldDescriptor::TYPE_DOUBLE:
634 if (value->type == TT_FLOAT) {
635 refl->AddDouble(m->get(), field, *(value->u.doubleptr));
636 } else {
637 throw std::logic_error(std::string("Invalid type, required float for ")
638 + (*m)->GetTypeName() + field_name);
639 }
640 break;
641 case FieldDescriptor::TYPE_FLOAT:
642 if (value->type == TT_FLOAT) {
643 refl->AddFloat(m->get(), field, *(value->u.doubleptr));
644 } else {
645 throw std::logic_error(std::string("Invalid type, required float for ")
646 + (*m)->GetTypeName() + field_name);
647 }
648 break;
649
650 case FieldDescriptor::TYPE_SFIXED64:
651 case FieldDescriptor::TYPE_SINT64:
652 case FieldDescriptor::TYPE_INT64:
653 if (value->type == INTEGER) {
654 refl->AddInt64(m->get(), field, value->u.intval);
655 } else if (value->type == LONG_LONG) {
656 refl->AddInt64(m->get(), field, value->u.llintval);
657 } else {
658 throw std::logic_error(std::string("Invalid type, required integer or long long for ")
659 + (*m)->GetTypeName() + field_name);
660 }
661 break;
662
663 case FieldDescriptor::TYPE_SFIXED32:
664 case FieldDescriptor::TYPE_SINT32:
665 case FieldDescriptor::TYPE_INT32:
666 if (value->type == INTEGER) {
667 refl->AddInt32(m->get(), field, value->u.intval);
668 } else {
669 throw std::logic_error(std::string("Invalid type, required integer for ")
670 + (*m)->GetTypeName() + field_name);
671 }
672 break;
673 case FieldDescriptor::TYPE_BOOL:
674 if (value->type == TT_ATOM) {
675 if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
676 refl->AddBool(m->get(), field, (value->u.id == lisp_t_sym));
677 } else {
678 throw std::logic_error(std::string("Invalid value, allowed are T or NIL for field ")
679 + (*m)->GetTypeName() + field_name);
680 }
681 } else {
682 throw std::logic_error(std::string("Invalid type, required symbol for ")
683 + (*m)->GetTypeName() + field_name);
684 }
685 break;
686 case FieldDescriptor::TYPE_STRING:
687 if (value->type == STRING) {
688 refl->AddString(m->get(), field, value->u.string);
689 } else {
690 throw std::logic_error(std::string("Invalid type, required string for ")
691 + (*m)->GetTypeName() + field_name);
692 }
693 break;
694 case FieldDescriptor::TYPE_MESSAGE:
695 if (value->type == U_POINTER) {
696 std::shared_ptr<google::protobuf::Message> *mfrom =
697 static_cast<std::shared_ptr<google::protobuf::Message> *>(value->u.u_pointer);
698 Message *mut_msg = refl->AddMessage(m->get(), field);
699 mut_msg->CopyFrom(**mfrom);
700 delete mfrom;
701 } else {
702 throw std::logic_error(std::string("Invalid type, required user pointer for ")
703 + (*m)->GetTypeName() + field_name);
704 }
705 break;
706
707 case FieldDescriptor::TYPE_BYTES: break;
708
709 case FieldDescriptor::TYPE_FIXED32:
710 case FieldDescriptor::TYPE_UINT32:
711 if (value->type == INTEGER) {
712 refl->AddUInt32(m->get(), field, value->u.intval);
713 } else if (value->type == LONG_LONG) {
714 refl->AddUInt32(m->get(), field, value->u.llintval);
715 } else {
716 throw std::logic_error(std::string("Invalid type, required integer or long long for ")
717 + (*m)->GetTypeName() + field_name);
718 }
719 break;
720
721 case FieldDescriptor::TYPE_ENUM: {
722 const char *sym_name = NULL;
723 if (value->type == TT_ATOM) {
724 sym_name = value->u.id;
725 } else if (value->type == STRING) {
726 sym_name = value->u.string;
727 } else {
728 throw std::logic_error(std::string("Invalid type, required symbol or string for ")
729 + (*m)->GetTypeName() + field_name);
730 }
731 const EnumDescriptor * enumdesc = field->enum_type();
732 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
733 if (enumval) {
734 refl->AddEnum(m->get(), field, enumval);
735 } else {
736 //logger_->log_warn("RefBox", "%s: cannot set invalid enum value '%s' on '%s'",
737 // (*m)->GetTypeName().c_str(), value.as_string().c_str(), field_name.c_str());
738 }
739 } break;
740
741 default: throw std::logic_error("Unknown protobuf field type encountered");
742 }
743 } catch (std::logic_error &e) {
744 //logger_->log_warn("RefBox", "Failed to add field %s of %s: %s", field_name.c_str(),
745 // (*m)->GetTypeName().c_str(), e.what());
746 }
747}
748
749/** Connect as a client to the given server.
750 * Note that this will perform an asynchronous connect. A
751 * (protobuf-client-connected) or (protobuf-client-disconnected) fact
752 * is asserted during (pb-process) in the case of success or failure.
753 * @param host host to connect to
754 * @param port TCP port to connect to
755 * @return Term with a long long of the client ID
756 */
757Term *
759{
760 if (port <= 0)
761 return build_nil();
762
763 ProtobufStreamClient *client = new ProtobufStreamClient(&*message_register_);
764
765 long int client_id;
766 {
767 fawkes::MutexLocker lock(&map_mutex_);
768 client_id = ++next_client_id_;
769 clients_[client_id] = client;
770 }
771
772 client->signal_connected().connect(
773 boost::bind(&OpenPRSProtobuf::handle_client_connected, this, client_id));
774 client->signal_disconnected().connect(boost::bind(&OpenPRSProtobuf::handle_client_disconnected,
775 this,
776 client_id,
777 boost::asio::placeholders::error));
778 client->signal_received().connect(
779 boost::bind(&OpenPRSProtobuf::handle_client_msg, this, client_id, _1, _2, _3));
780 client->signal_receive_failed().connect(
781 boost::bind(&OpenPRSProtobuf::handle_client_receive_fail, this, client_id, _1, _2, _3));
782
783 client->async_connect(host.c_str(), port);
784 return build_long_long(client_id);
785}
786
787/** Send message to a specific client.
788 * @param client_id ID of the client, this can be a server client ID, a client
789 * ID, or a peer ID (message will then be broadcasted).
790 * @param msgptr message to send
791 */
792void
793OpenPRSProtobuf::oprs_pb_send(long int client_id, void *msgptr)
794{
795 std::shared_ptr<google::protobuf::Message> *m =
796 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
797 if (!(m && *m)) {
798 //logger_->log_warn("RefBox", "Cannot send to %li: invalid message", client_id);
799 return;
800 }
801
802 try {
803 fawkes::MutexLocker lock(&map_mutex_);
804
805 if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
806 //printf("***** SENDING via SERVER\n");
807 server_->send(server_clients_[client_id], *m);
808 sig_server_sent_(server_clients_[client_id], *m);
809 } else if (clients_.find(client_id) != clients_.end()) {
810 //printf("***** SENDING via CLIENT\n");
811 clients_[client_id]->send(*m);
812 std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
813 sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
814 } else if (peers_.find(client_id) != peers_.end()) {
815 //printf("***** SENDING via CLIENT\n");
816 peers_[client_id]->send(*m);
817 sig_peer_sent_(client_id, *m);
818 } else {
819 //printf("Client ID %li is unknown, cannot send message of type %s\n",
820 // client_id, (*m)->GetTypeName().c_str());
821 }
822 } catch (google::protobuf::FatalException &e) {
823 //logger_->log_warn("RefBox", "Failed to send message of type %s: %s",
824 // (*m)->GetTypeName().c_str(), e.what());
825 } catch (std::runtime_error &e) {
826 //logger_->log_warn("RefBox", "Failed to send message of type %s: %s",
827 // (*m)->GetTypeName().c_str(), e.what());
828 }
829}
830
831/** Broadcast a message through a peer.
832 * @param peer_id ID broadcast peer to send through
833 * @param msgptr message to send
834 */
835void
836OpenPRSProtobuf::oprs_pb_broadcast(long int peer_id, void *msgptr)
837{
838 std::shared_ptr<google::protobuf::Message> *m =
839 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
840 if (!(m && *m)) {
841 fprintf(stderr, "Cannot send broadcast: invalid message");
842 return;
843 }
844
845 fawkes::MutexLocker lock(&map_mutex_);
846 if (peers_.find(peer_id) == peers_.end())
847 return;
848
849 try {
850 peers_[peer_id]->send(*m);
851 } catch (google::protobuf::FatalException &e) {
852 fprintf(stderr,
853 "pb-broadcast: failed to broadcast message of type %s: %s\n",
854 (*m)->GetTypeName().c_str(),
855 e.what());
856 }
857
858 sig_peer_sent_(peer_id, *m);
859}
860
861/** Disconnect a given client.
862 * @param client_id ID of client to disconnect, can be a server client ID or a client ID
863 */
864void
866{
867 //logger_->log_info("RefBox", "Disconnecting client %li", client_id);
868
869 try {
870 fawkes::MutexLocker lock(&map_mutex_);
871
872 if (server_clients_.find(client_id) != server_clients_.end()) {
873 protobuf_comm::ProtobufStreamServer::ClientID srv_client = server_clients_[client_id];
874 server_->disconnect(srv_client);
875 server_clients_.erase(client_id);
876 rev_server_clients_.erase(srv_client);
877 } else if (clients_.find(client_id) != clients_.end()) {
878 delete clients_[client_id];
879 clients_.erase(client_id);
880 }
881 } catch (std::runtime_error &e) {
882 throw fawkes::Exception("Failed to disconnect from client %li: %s", client_id, e.what());
883 }
884}
885
886/** Get list of values of a given message field.
887 * @param msgptr message
888 * @param field_name field to retrieve
889 * @return term which contains a Lisp list with properly typed values, or a symbol in
890 * case of an error
891 */
892Term *
893OpenPRSProtobuf::oprs_pb_field_list(void *msgptr, std::string field_name)
894{
895 std::shared_ptr<google::protobuf::Message> *m =
896 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
897 if (!(m && *m))
898 return build_id(declare_atom("INVALID-MESSAGE"));
899
900 const Descriptor * desc = (*m)->GetDescriptor();
901 const FieldDescriptor *field = desc->FindFieldByName(field_name);
902 if (!field)
903 return build_id(declare_atom("DOES-NOT-EXIST"));
904
905 TermList tl = sl_make_slist();
906
907 if (field->label() == FieldDescriptor::LABEL_REQUIRED
908 || field->label() == FieldDescriptor::LABEL_OPTIONAL) {
909 tl = build_term_list(tl, oprs_pb_field_value(msgptr, field_name));
910 return build_term_l_list_from_c_list(tl);
911 }
912
913 const Reflection *refl = (*m)->GetReflection();
914 int field_size = refl->FieldSize(**m, field);
915 for (int i = 0; i < field_size; ++i) {
916 switch (field->type()) {
917 case FieldDescriptor::TYPE_DOUBLE:
918 tl = build_term_list(tl, build_float(refl->GetRepeatedDouble(**m, field, i)));
919 break;
920 case FieldDescriptor::TYPE_FLOAT:
921 tl = build_term_list(tl, build_float(refl->GetRepeatedFloat(**m, field, i)));
922 break;
923 case FieldDescriptor::TYPE_UINT64:
924 case FieldDescriptor::TYPE_FIXED64:
925 tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt64(**m, field, i)));
926 break;
927 case FieldDescriptor::TYPE_UINT32:
928 case FieldDescriptor::TYPE_FIXED32:
929 tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt32(**m, field, i)));
930 break;
931 case FieldDescriptor::TYPE_BOOL:
932 tl = build_term_list(tl, refl->GetRepeatedBool(**m, field, i) ? build_t() : build_nil());
933 break;
934 case FieldDescriptor::TYPE_STRING:
935 tl = build_term_list(tl, build_string(refl->GetRepeatedString(**m, field, i).c_str()));
936 break;
937 case FieldDescriptor::TYPE_MESSAGE: {
938 const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
939 google::protobuf::Message * mcopy = msg.New();
940 mcopy->CopyFrom(msg);
941 void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
942 tl = build_term_list(tl, build_pointer(ptr));
943 } break;
944 case FieldDescriptor::TYPE_BYTES:
945 tl = build_term_list(tl, build_string((char *)"bytes"));
946 break;
947 case FieldDescriptor::TYPE_ENUM:
948 tl = build_term_list(tl,
949 build_id(
950 declare_atom(refl->GetRepeatedEnum(**m, field, i)->name().c_str())));
951 break;
952 case FieldDescriptor::TYPE_SFIXED32:
953 case FieldDescriptor::TYPE_INT32:
954 case FieldDescriptor::TYPE_SINT32:
955 tl = build_term_list(tl, build_integer(refl->GetRepeatedInt32(**m, field, i)));
956 break;
957 case FieldDescriptor::TYPE_SFIXED64:
958 case FieldDescriptor::TYPE_SINT64:
959 case FieldDescriptor::TYPE_INT64:
960 tl = build_term_list(tl, build_long_long(refl->GetRepeatedInt64(**m, field, i)));
961 break;
962 default: throw std::logic_error("Unknown protobuf field type encountered");
963 }
964 }
965
966 return build_term_l_list_from_c_list(tl);
967}
968
969/** Check if a given field is a list (repeated field).
970 * @param msgptr message
971 * @param field_name name of the field
972 * @return true if the field is a list, false otherwise
973 */
974bool
975OpenPRSProtobuf::oprs_pb_field_is_list(void *msgptr, std::string field_name)
976{
977 std::shared_ptr<google::protobuf::Message> *m =
978 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
979 if (!(m && *m))
980 return false;
981
982 const Descriptor * desc = (*m)->GetDescriptor();
983 const FieldDescriptor *field = desc->FindFieldByName(field_name);
984 if (!field) {
985 return false;
986 }
987 return (field->label() == FieldDescriptor::LABEL_REPEATED);
988}
989
990/** Process all pending events.
991 * This will process events and assert appropriate facts.
992 */
993void
995{
996 {
997 fawkes::MutexLocker lock(q_server_client_.mutex());
998 while (!q_server_client_.empty()) {
999 auto &sc = q_server_client_.front();
1000 oprs_assert_server_client_event(std::get<0>(sc),
1001 std::get<1>(sc),
1002 std::get<2>(sc),
1003 std::get<3>(sc));
1004 q_server_client_.pop();
1005 }
1006 }
1007
1008 {
1009 fawkes::MutexLocker lock(q_client_.mutex());
1010 while (!q_client_.empty()) {
1011 auto &c = q_client_.front();
1012 oprs_assert_client_event(std::get<0>(c), std::get<1>(c));
1013 q_client_.pop();
1014 }
1015 }
1016
1017 {
1018 fawkes::MutexLocker lock(q_msgs_.mutex());
1019 while (!q_msgs_.empty()) {
1020 auto &m = q_msgs_.front();
1021 oprs_assert_message(std::get<0>(m),
1022 std::get<1>(m),
1023 std::get<2>(m),
1024 std::get<3>(m),
1025 std::get<4>(m),
1026 std::get<5>(m),
1027 std::get<6>(m));
1028 q_msgs_.pop();
1029 }
1030 }
1031}
1032
1033/** Check if there are pending events.
1034 * @return true if there are pending events, false otherwise
1035 */
1036bool
1038{
1039 fawkes::MutexLocker lock1(q_server_client_.mutex());
1040 fawkes::MutexLocker lock2(q_client_.mutex());
1041 fawkes::MutexLocker lock3(q_msgs_.mutex());
1042
1043 return (!(q_server_client_.empty() && q_client_.empty() && q_msgs_.empty()));
1044}
1045
1046void
1047OpenPRSProtobuf::oprs_assert_server_client_event(long int client_id,
1048 std::string & host,
1049 unsigned short port,
1050 bool connect)
1051{
1052 TermList tl = sl_make_slist();
1053 tl = build_term_list(tl, build_long_long(client_id));
1054 if (connect) {
1055 tl = build_term_list(tl, build_string(host.c_str()));
1056 tl = build_term_list(tl, build_integer(port));
1057 add_external_fact((char *)"protobuf-server-client-connected", tl);
1058 } else {
1059 add_external_fact((char *)"protobuf-server-client-disconnected", tl);
1060 }
1061}
1062
1063void
1064OpenPRSProtobuf::oprs_assert_client_event(long int client_id, bool connect)
1065{
1066 TermList tl = sl_make_slist();
1067 tl = build_term_list(tl, build_long_long(client_id));
1068 if (connect) {
1069 add_external_fact((char *)"protobuf-client-connected", tl);
1070 } else {
1071 add_external_fact((char *)"protobuf-client-disconnected", tl);
1072 }
1073}
1074
1075void
1076OpenPRSProtobuf::oprs_assert_message(std::string & endpoint_host,
1077 unsigned short endpoint_port,
1078 uint16_t comp_id,
1079 uint16_t msg_type,
1080 std::shared_ptr<google::protobuf::Message> &msg,
1081 OpenPRSProtobuf::ClientType ct,
1082 unsigned int client_id)
1083{
1084 TermList tl = sl_make_slist();
1085
1086 struct timeval tv;
1087 gettimeofday(&tv, 0);
1088 void *ptr = new std::shared_ptr<google::protobuf::Message>(msg);
1089 //tl = build_term_list(tl, build_string((char *)"type"));
1090 tl = build_term_list(tl, build_string(msg->GetTypeName().c_str()));
1091 //tl = build_term_list(tl, build_string((char *)"comp-id"));
1092 tl = build_term_list(tl, build_integer(comp_id));
1093 //tl = build_term_list(tl, build_string((char *)"msg-type"));
1094 tl = build_term_list(tl, build_integer(msg_type));
1095 //tl = build_term_list(tl, build_string((char *)"rcvd-via"));
1096 tl = build_term_list(tl, build_string((client_id == 0) ? "BROADCAST" : "STREAM"));
1097 //tl = build_term_list(tl, build_string((char *)"rcvd-at"));
1098 tl = build_term_list(tl, build_long_long(tv.tv_sec));
1099 tl = build_term_list(tl, build_long_long(tv.tv_usec));
1100 //tl = build_term_list(tl, build_string((char *)"rcvd-from"));
1101 tl = build_term_list(tl, build_string(endpoint_host.c_str()));
1102 tl = build_term_list(tl, build_integer(endpoint_port));
1103 //tl = build_term_list(tl, build_string((char *)"client-type"));
1104 tl = build_term_list(tl,
1105 build_string(ct == CT_CLIENT ? "CLIENT"
1106 : (ct == CT_SERVER ? "SERVER" : "PEER")));
1107 //tl = build_term_list(tl, build_string((char *)"client-id"));
1108 tl = build_term_list(tl, build_integer(client_id));
1109 //tl = build_term_list(tl, build_string((char *)"ptr"));
1110 tl = build_term_list(tl, build_pointer(ptr));
1111
1112 add_external_fact((char *)"protobuf-msg", tl);
1113}
1114
1115void
1116OpenPRSProtobuf::handle_server_client_connected(ProtobufStreamServer::ClientID client,
1117 boost::asio::ip::tcp::endpoint &endpoint)
1118{
1119 long int client_id = -1;
1120 {
1121 fawkes::MutexLocker lock(&map_mutex_);
1122 client_id = ++next_client_id_;
1123 client_endpoints_[client_id] = std::make_pair(endpoint.address().to_string(), endpoint.port());
1124 server_clients_[client_id] = client;
1125 rev_server_clients_[client] = client_id;
1126 }
1127
1128 q_server_client_.push_locked(
1129 std::make_tuple(client_id, endpoint.address().to_string(), endpoint.port(), true));
1130}
1131
1132void
1133OpenPRSProtobuf::handle_server_client_disconnected(ProtobufStreamServer::ClientID client,
1134 const boost::system::error_code &error)
1135{
1136 long int client_id = -1;
1137 {
1138 fawkes::MutexLocker lock(&map_mutex_);
1139 RevServerClientMap::iterator c;
1140 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1141 client_id = c->second;
1142 rev_server_clients_.erase(c);
1143 server_clients_.erase(client_id);
1144 }
1145 }
1146
1147 if (client_id >= 0) {
1148 q_server_client_.push_locked(std::make_tuple(client_id, "", 0, false));
1149 }
1150}
1151
1152/** Handle message that came from a client.
1153 * @param client client ID
1154 * @param component_id component the message was addressed to
1155 * @param msg_type type of the message
1156 * @param msg the message
1157 */
1158void
1159OpenPRSProtobuf::handle_server_client_msg(ProtobufStreamServer::ClientID client,
1160 uint16_t component_id,
1161 uint16_t msg_type,
1162 std::shared_ptr<google::protobuf::Message> msg)
1163{
1164 fawkes::MutexLocker lock(&map_mutex_);
1165 RevServerClientMap::iterator c;
1166 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1167 q_msgs_.push_locked(std::make_tuple(client_endpoints_[c->second].first,
1168 client_endpoints_[c->second].second,
1169 component_id,
1170 msg_type,
1171 msg,
1172 CT_SERVER,
1173 c->second));
1174 }
1175}
1176
1177/** Handle server reception failure
1178 * @param client client ID
1179 * @param component_id component the message was addressed to
1180 * @param msg_type type of the message
1181 * @param msg the message string
1182 */
1183void
1184OpenPRSProtobuf::handle_server_client_fail(ProtobufStreamServer::ClientID client,
1185 uint16_t component_id,
1186 uint16_t msg_type,
1187 std::string msg)
1188{
1189 fawkes::MutexLocker lock(&map_mutex_);
1190 RevServerClientMap::iterator c;
1191 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1192 /*
1193 fawkes::MutexLocker lock(&oprs_mutex_);
1194 oprs_->assert_fact_f("(protobuf-server-receive-failed (comp-id %u) (msg-type %u) "
1195 "(rcvd-via STREAM) (client-id %li) (message \"%s\") "
1196 "(rcvd-from (\"%s\" %u)))",
1197 component_id, msg_type, c->second, msg.c_str(),
1198 client_endpoints_[c->second].first.c_str(),
1199 client_endpoints_[c->second].second);
1200 */
1201 }
1202}
1203
1204/** Handle message that came from a peer/robot
1205 * @param endpoint the endpoint from which the message was received
1206 * @param component_id component the message was addressed to
1207 * @param msg_type type of the message
1208 * @param msg the message
1209 */
1210void
1211OpenPRSProtobuf::handle_peer_msg(long int peer_id,
1212 boost::asio::ip::udp::endpoint & endpoint,
1213 uint16_t component_id,
1214 uint16_t msg_type,
1215 std::shared_ptr<google::protobuf::Message> msg)
1216{
1217 q_msgs_.push_locked(std::make_tuple(endpoint.address().to_string(),
1218 endpoint.port(),
1219 component_id,
1220 msg_type,
1221 msg,
1222 CT_PEER,
1223 peer_id));
1224}
1225
1226/** Handle error during peer message processing.
1227 * @param endpoint endpoint of incoming message
1228 * @param msg error message
1229 */
1230void
1231OpenPRSProtobuf::handle_peer_recv_error(long int peer_id,
1232 boost::asio::ip::udp::endpoint &endpoint,
1233 std::string msg)
1234{
1235 fprintf(stderr,
1236 "Failed to receive peer message from %s:%u: %s\n",
1237 endpoint.address().to_string().c_str(),
1238 endpoint.port(),
1239 msg.c_str());
1240}
1241
1242/** Handle error during peer message processing.
1243 * @param msg error message
1244 */
1245void
1246OpenPRSProtobuf::handle_peer_send_error(long int peer_id, const std::string &msg)
1247{
1248 //logger_->log_warn("RefBox", "Failed to send peer message: %s", msg.c_str());
1249}
1250
1251void
1252OpenPRSProtobuf::handle_client_connected(long int client_id)
1253{
1254 q_client_.push_locked(std::make_tuple(client_id, true));
1255}
1256
1257void
1258OpenPRSProtobuf::handle_client_disconnected(long int client_id,
1259 const boost::system::error_code &error)
1260{
1261 q_client_.push_locked(std::make_tuple(client_id, false));
1262}
1263
1264void
1265OpenPRSProtobuf::handle_client_msg(long int client_id,
1266 uint16_t comp_id,
1267 uint16_t msg_type,
1268 std::shared_ptr<google::protobuf::Message> msg)
1269{
1270 q_msgs_.push_locked(std::make_tuple("", 0, comp_id, msg_type, msg, CT_CLIENT, client_id));
1271}
1272
1273void
1274OpenPRSProtobuf::handle_client_receive_fail(long int client_id,
1275 uint16_t comp_id,
1276 uint16_t msg_type,
1277 const std::string &msg)
1278{
1279 /*
1280 oprs_->assert_fact_f("(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) "
1281 "(comp-id %u) (msg-type %u) (message \"%s\"))",
1282 client_id, comp_id, msg_type, msg.c_str());
1283 */
1284}
1285
1286} // namespace oprs_protobuf
Base class for exceptions in Fawkes.
Definition: exception.h:36
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_queue.h:74
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:135
Mutex locking helper.
Definition: mutex_locker.h:34
Term * oprs_pb_peer_create_local_crypto(const std::string &host, int send_port, int recv_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Term * oprs_pb_destroy(void *msgptr)
Destroy given message (reference).
bool oprs_pb_register_type(std::string full_name)
Register a new message type.
void oprs_pb_broadcast(long int peer_id, void *msgptr)
Broadcast a message through a peer.
Term * oprs_pb_ref(void *msgptr)
Create new reference to message.
Term * oprs_pb_peer_create_local(const std::string &host, int send_port, int recv_port)
Enable protobuf peer.
Term * oprs_pb_peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Term * oprs_pb_field_label(void *msgptr, std::string field_name)
Get a fields label.
void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
Set a field.
std::shared_ptr< google::protobuf::Message > * oprs_create_msg(std::string full_name)
Create a new message of given type.
bool oprs_pb_field_is_list(void *msgptr, std::string field_name)
Check if a given field is a list (repeated field).
OpenPRSProtobuf(std::vector< std::string > &proto_path)
Constructor.
void oprs_pb_process()
Process all pending events.
bool oprs_pb_events_pending()
Check if there are pending events.
void oprs_pb_disable_server()
Disable protobuf stream server.
void oprs_pb_send(long int client_id, void *msgptr)
Send message to a specific client.
void oprs_pb_enable_server(int port)
Enable protobuf stream server.
Term * oprs_pb_client_connect(std::string host, int port)
Connect as a client to the given server.
Term * oprs_pb_peer_create(const std::string &host, int port)
Enable protobuf peer.
bool oprs_pb_has_field(void *msgptr, std::string field_name)
Check if message has a specific field.
void oprs_pb_peer_setup_crypto(long int peer_id, const std::string &crypto_key, const std::string &cipher)
Setup crypto for peer.
Term * oprs_pb_field_value(void *msgptr, std::string field_name)
Get properly typed field value.
Term * oprs_pb_field_names(void *msgptr)
Get field names of message.
void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
Add value to a repeated field.
Term * oprs_pb_field_type(void *msgptr, std::string field_name)
Get type if a specific field.
void oprs_pb_disconnect(long int client_id)
Disconnect a given client.
Term * oprs_pb_field_list(void *msgptr, std::string field_name)
Get list of values of a given message field.
void oprs_pb_peer_destroy(long int peer_id)
Disable peer.