Fawkes API Fawkes Development Version
communicator.cpp
1
2/***************************************************************************
3 * communicator.cpp - protobuf network communication for CLIPS
4 *
5 * Created: Tue Apr 16 13:51:14 2013
6 * Copyright 2013 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 *
13 * - Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the
18 * distribution.
19 * - Neither the name of the authors nor the names of its contributors
20 * may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27 * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34 * OF THE POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include <core/threading/mutex_locker.h>
38#include <google/protobuf/descriptor.h>
39#include <logging/logger.h>
40#include <protobuf_clips/communicator.h>
41#include <protobuf_comm/client.h>
42#include <protobuf_comm/peer.h>
43#include <protobuf_comm/server.h>
44
45#include <boost/format.hpp>
46
47using namespace google::protobuf;
48using namespace protobuf_comm;
49using namespace boost::placeholders;
50
51namespace protobuf_clips {
52
53/** @class ClipsProtobufCommunicator <protobuf_clips/communicator.h>
54 * CLIPS protobuf integration class.
55 * This class adds functionality related to protobuf to a given CLIPS
56 * environment. It supports the creation of communication channels
57 * through protobuf_comm. An instance maintains its own message register
58 * shared among server, peer, and clients.
59 * @author Tim Niemueller
60 */
61
62/** Constructor.
63 * @param env CLIPS environment to which to provide the protobuf functionality
64 * @param env_mutex mutex to lock when operating on the CLIPS environment.
65 * @param logger optional logger for informational output
66 */
68 fawkes::Mutex & env_mutex,
69 fawkes::Logger * logger)
70: clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
71{
72 message_register_ = new MessageRegister();
73 setup_clips();
74}
75
76/** Constructor.
77 * @param env CLIPS environment to which to provide the protobuf functionality
78 * @param env_mutex mutex to lock when operating on the CLIPS environment.
79 * @param proto_path proto path passed to a newly instantiated message register
80 * @param logger optional logger for informational output
81 */
83 fawkes::Mutex & env_mutex,
84 std::vector<std::string> &proto_path,
85 fawkes::Logger * logger)
86: clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
87{
88 message_register_ = new MessageRegister(proto_path);
89 setup_clips();
90}
91
92/** Destructor. */
94{
95 {
96 fawkes::MutexLocker lock(&clips_mutex_);
97
98 for (auto f : functions_) {
99 clips_->remove_function(f);
100 }
101 functions_.clear();
102 }
103
104 for (auto c : clients_) {
105 delete c.second;
106 }
107 clients_.clear();
108
109 delete message_register_;
110 delete server_;
111}
112
113#define ADD_FUNCTION(n, s) \
114 clips_->add_function(n, s); \
115 functions_.push_back(n);
116
117/** Setup CLIPS environment. */
118void
119ClipsProtobufCommunicator::setup_clips()
120{
121 fawkes::MutexLocker lock(&clips_mutex_);
122
123 ADD_FUNCTION("pb-register-type",
124 (sigc::slot<CLIPS::Value, std::string>(
125 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_register_type))));
126 ADD_FUNCTION("pb-field-names",
127 (sigc::slot<CLIPS::Values, void *>(
128 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_names))));
129 ADD_FUNCTION("pb-field-type",
130 (sigc::slot<CLIPS::Value, void *, std::string>(
131 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_type))));
132 ADD_FUNCTION("pb-has-field",
133 (sigc::slot<CLIPS::Value, void *, std::string>(
134 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_has_field))));
135 ADD_FUNCTION("pb-field-label",
136 (sigc::slot<CLIPS::Value, void *, std::string>(
137 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_label))));
138 ADD_FUNCTION("pb-field-value",
139 (sigc::slot<CLIPS::Value, void *, std::string>(
140 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_value))));
141 ADD_FUNCTION("pb-field-list",
142 (sigc::slot<CLIPS::Values, void *, std::string>(
143 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_list))));
144 ADD_FUNCTION("pb-field-is-list",
145 (sigc::slot<CLIPS::Value, void *, std::string>(
146 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_is_list))));
147 ADD_FUNCTION("pb-create",
148 (sigc::slot<CLIPS::Value, std::string>(
149 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_create))));
150 ADD_FUNCTION("pb-destroy",
151 (sigc::slot<void, void *>(
152 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_destroy))));
153 ADD_FUNCTION("pb-ref",
154 (sigc::slot<CLIPS::Value, void *>(
155 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_ref))));
156 ADD_FUNCTION("pb-set-field",
157 (sigc::slot<void, void *, std::string, CLIPS::Value>(
158 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_set_field))));
159 ADD_FUNCTION("pb-add-list",
160 (sigc::slot<void, void *, std::string, CLIPS::Value>(
161 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_add_list))));
162 ADD_FUNCTION("pb-send",
163 (sigc::slot<void, long int, void *>(
164 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_send))));
165 ADD_FUNCTION("pb-tostring",
166 (sigc::slot<std::string, void *>(
167 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_tostring))));
168 ADD_FUNCTION("pb-server-enable",
169 (sigc::slot<void, int>(
170 sigc::mem_fun(*this, &ClipsProtobufCommunicator::enable_server))));
171 ADD_FUNCTION("pb-server-disable",
172 (sigc::slot<void>(
173 sigc::mem_fun(*this, &ClipsProtobufCommunicator::disable_server))));
174 ADD_FUNCTION("pb-peer-create",
175 (sigc::slot<long int, std::string, int>(
176 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create))));
177 ADD_FUNCTION("pb-peer-create-local",
178 (sigc::slot<long int, std::string, int, int>(
179 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create_local))));
180 ADD_FUNCTION("pb-peer-create-crypto",
181 (sigc::slot<long int, std::string, int, std::string, std::string>(
182 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create_crypto))));
183 ADD_FUNCTION("pb-peer-create-local-crypto",
184 (sigc::slot<long int, std::string, int, int, std::string, std::string>(sigc::mem_fun(
185 *this, &ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto))));
186 ADD_FUNCTION("pb-peer-destroy",
187 (sigc::slot<void, long int>(
188 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_destroy))));
189 ADD_FUNCTION("pb-peer-setup-crypto",
190 (sigc::slot<void, long int, std::string, std::string>(
191 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_setup_crypto))));
192 ADD_FUNCTION("pb-broadcast",
193 (sigc::slot<void, long int, void *>(
194 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_broadcast))));
195 ADD_FUNCTION("pb-connect",
196 (sigc::slot<long int, std::string, int>(
197 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_client_connect))));
198 ADD_FUNCTION("pb-disconnect",
199 (sigc::slot<void, long int>(
200 sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_disconnect))));
201}
202
203/** Enable protobuf stream server.
204 * @param port TCP port to listen on for connections
205 */
206void
208{
209 if ((port > 0) && !server_) {
210 server_ = new protobuf_comm::ProtobufStreamServer(port, message_register_);
211
212 server_->signal_connected().connect(
213 boost::bind(&ClipsProtobufCommunicator::handle_server_client_connected, this, _1, _2));
214 server_->signal_disconnected().connect(
215 boost::bind(&ClipsProtobufCommunicator::handle_server_client_disconnected, this, _1, _2));
216 server_->signal_received().connect(
217 boost::bind(&ClipsProtobufCommunicator::handle_server_client_msg, this, _1, _2, _3, _4));
218 server_->signal_receive_failed().connect(
219 boost::bind(&ClipsProtobufCommunicator::handle_server_client_fail, this, _1, _2, _3, _4));
220 }
221}
222
223/** Disable protobu stream server. */
224void
226{
227 delete server_;
228 server_ = NULL;
229}
230
231/** Enable protobuf peer.
232 * @param address IP address to send messages to
233 * @param send_port UDP port to send messages to
234 * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
235 * @param crypto_key encryption key
236 * @param cipher cipher suite, see BufferEncryptor for supported types
237 * @return peer identifier
238 */
239long int
240ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto(std::string address,
241 int send_port,
242 int recv_port,
243 std::string crypto_key,
244 std::string cipher)
245{
246 if (recv_port <= 0)
247 recv_port = send_port;
248
249 if (send_port > 0) {
250 protobuf_comm::ProtobufBroadcastPeer *peer = new protobuf_comm::ProtobufBroadcastPeer(
251 address, send_port, recv_port, message_register_, crypto_key, cipher);
252
253 long int peer_id;
254 {
255 fawkes::MutexLocker lock(&map_mutex_);
256 peer_id = ++next_client_id_;
257 peers_[peer_id] = peer;
258 }
259
260 peer->signal_received().connect(
261 boost::bind(&ClipsProtobufCommunicator::handle_peer_msg, this, peer_id, _1, _2, _3, _4));
262 peer->signal_recv_error().connect(
263 boost::bind(&ClipsProtobufCommunicator::handle_peer_recv_error, this, peer_id, _1, _2));
264 peer->signal_send_error().connect(
265 boost::bind(&ClipsProtobufCommunicator::handle_peer_send_error, this, peer_id, _1));
266
267 return peer_id;
268 } else {
269 return 0;
270 }
271}
272
273/** Enable protobuf peer.
274 * @param address IP address to send messages to
275 * @param port UDP port to send and receive messages
276 * @param crypto_key encryption key
277 * @param cipher cipher suite, see BufferEncryptor for supported types
278 * @return peer identifier
279 */
280long int
281ClipsProtobufCommunicator::clips_pb_peer_create_crypto(std::string address,
282 int port,
283 std::string crypto_key,
284 std::string cipher)
285{
286 return clips_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
287}
288
289/** Enable protobuf peer.
290 * @param address IP address to send messages to
291 * @param port UDP port to send and receive messages
292 * @return peer identifier
293 */
294long int
295ClipsProtobufCommunicator::clips_pb_peer_create(std::string address, int port)
296{
297 return clips_pb_peer_create_local_crypto(address, port, port);
298}
299
300/** Enable protobuf peer.
301 * @param address IP address to send messages to
302 * @param send_port UDP port to send messages to
303 * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
304 * @return peer identifier
305 */
306long int
307ClipsProtobufCommunicator::clips_pb_peer_create_local(std::string address,
308 int send_port,
309 int recv_port)
310{
311 return clips_pb_peer_create_local_crypto(address, send_port, recv_port);
312}
313
314/** Disable peer.
315 * @param peer_id ID of the peer to destroy
316 */
317void
318ClipsProtobufCommunicator::clips_pb_peer_destroy(long int peer_id)
319{
320 if (peers_.find(peer_id) != peers_.end()) {
321 delete peers_[peer_id];
322 peers_.erase(peer_id);
323 }
324}
325
326/** Setup crypto for peer.
327 * @param peer_id ID of the peer to destroy
328 * @param crypto_key encryption key
329 * @param cipher cipher suite, see BufferEncryptor for supported types
330 */
331void
332ClipsProtobufCommunicator::clips_pb_peer_setup_crypto(long int peer_id,
333 std::string crypto_key,
334 std::string cipher)
335{
336 if (peers_.find(peer_id) != peers_.end()) {
337 peers_[peer_id]->setup_crypto(crypto_key, cipher);
338 }
339}
340
341/** Register a new message type.
342 * @param full_name full name of type to register
343 * @return true if the type was successfully registered, false otherwise
344 */
345CLIPS::Value
346ClipsProtobufCommunicator::clips_pb_register_type(std::string full_name)
347{
348 try {
349 message_register_->add_message_type(full_name);
350 return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
351 } catch (std::runtime_error &e) {
352 if (logger_) {
353 logger_->log_error("CLIPS-Protobuf",
354 "Registering type %s failed: %s",
355 full_name.c_str(),
356 e.what());
357 }
358 return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
359 }
360}
361
362CLIPS::Value
363ClipsProtobufCommunicator::clips_pb_create(std::string full_name)
364{
365 try {
366 std::shared_ptr<google::protobuf::Message> m = message_register_->new_message_for(full_name);
367 return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>(m));
368 } catch (std::runtime_error &e) {
369 if (logger_) {
370 logger_->log_warn("CLIPS-Protobuf",
371 "Cannot create message of type %s: %s",
372 full_name.c_str(),
373 e.what());
374 }
375 return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>());
376 }
377}
378
379CLIPS::Value
380ClipsProtobufCommunicator::clips_pb_ref(void *msgptr)
381{
382 std::shared_ptr<google::protobuf::Message> *m =
383 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
384 if (!*m)
385 return new std::shared_ptr<google::protobuf::Message>();
386
387 return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>(*m));
388}
389
390void
391ClipsProtobufCommunicator::clips_pb_destroy(void *msgptr)
392{
393 std::shared_ptr<google::protobuf::Message> *m =
394 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
395 if (!*m)
396 return;
397
398 delete m;
399}
400
401CLIPS::Values
402ClipsProtobufCommunicator::clips_pb_field_names(void *msgptr)
403{
404 std::shared_ptr<google::protobuf::Message> *m =
405 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
406 if (!*m)
407 return CLIPS::Values();
408
409 const Descriptor *desc = (*m)->GetDescriptor();
410 const int field_count = desc->field_count();
411 CLIPS::Values field_names(field_count);
412 for (int i = 0; i < field_count; ++i) {
413 field_names[i].set(desc->field(i)->name(), true);
414 }
415 return field_names;
416}
417
418CLIPS::Value
419ClipsProtobufCommunicator::clips_pb_field_type(void *msgptr, std::string field_name)
420{
421 std::shared_ptr<google::protobuf::Message> *m =
422 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
423 if (!*m)
424 return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
425
426 const Descriptor * desc = (*m)->GetDescriptor();
427 const FieldDescriptor *field = desc->FindFieldByName(field_name);
428 if (!field) {
429 return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
430 }
431 switch (field->type()) {
432 case FieldDescriptor::TYPE_DOUBLE: return CLIPS::Value("DOUBLE", CLIPS::TYPE_SYMBOL);
433 case FieldDescriptor::TYPE_FLOAT: return CLIPS::Value("FLOAT", CLIPS::TYPE_SYMBOL);
434 case FieldDescriptor::TYPE_INT64: return CLIPS::Value("INT64", CLIPS::TYPE_SYMBOL);
435 case FieldDescriptor::TYPE_UINT64: return CLIPS::Value("UINT64", CLIPS::TYPE_SYMBOL);
436 case FieldDescriptor::TYPE_INT32: return CLIPS::Value("INT32", CLIPS::TYPE_SYMBOL);
437 case FieldDescriptor::TYPE_FIXED64: return CLIPS::Value("FIXED64", CLIPS::TYPE_SYMBOL);
438 case FieldDescriptor::TYPE_FIXED32: return CLIPS::Value("FIXED32", CLIPS::TYPE_SYMBOL);
439 case FieldDescriptor::TYPE_BOOL: return CLIPS::Value("BOOL", CLIPS::TYPE_SYMBOL);
440 case FieldDescriptor::TYPE_STRING: return CLIPS::Value("STRING", CLIPS::TYPE_SYMBOL);
441 case FieldDescriptor::TYPE_MESSAGE: return CLIPS::Value("MESSAGE", CLIPS::TYPE_SYMBOL);
442 case FieldDescriptor::TYPE_BYTES: return CLIPS::Value("BYTES", CLIPS::TYPE_SYMBOL);
443 case FieldDescriptor::TYPE_UINT32: return CLIPS::Value("UINT32", CLIPS::TYPE_SYMBOL);
444 case FieldDescriptor::TYPE_ENUM: return CLIPS::Value("ENUM", CLIPS::TYPE_SYMBOL);
445 case FieldDescriptor::TYPE_SFIXED32: return CLIPS::Value("SFIXED32", CLIPS::TYPE_SYMBOL);
446 case FieldDescriptor::TYPE_SFIXED64: return CLIPS::Value("SFIXED64", CLIPS::TYPE_SYMBOL);
447 case FieldDescriptor::TYPE_SINT32: return CLIPS::Value("SINT32", CLIPS::TYPE_SYMBOL);
448 case FieldDescriptor::TYPE_SINT64: return CLIPS::Value("SINT64", CLIPS::TYPE_SYMBOL);
449 default: return CLIPS::Value("UNKNOWN", CLIPS::TYPE_SYMBOL);
450 }
451}
452
453CLIPS::Value
454ClipsProtobufCommunicator::clips_pb_has_field(void *msgptr, std::string field_name)
455{
456 std::shared_ptr<google::protobuf::Message> *m =
457 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
458 if (!*m)
459 return false;
460
461 const Descriptor * desc = (*m)->GetDescriptor();
462 const FieldDescriptor *field = desc->FindFieldByName(field_name);
463 if (!field)
464 return false;
465
466 const Reflection *refl = (*m)->GetReflection();
467
468 if (field->is_repeated()) {
469 return CLIPS::Value((refl->FieldSize(**m, field) > 0) ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
470 } else if (field->is_optional()) {
471 return CLIPS::Value(refl->HasField(**m, field) ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
472 } else {
473 return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
474 }
475}
476
477CLIPS::Value
478ClipsProtobufCommunicator::clips_pb_field_label(void *msgptr, std::string field_name)
479{
480 std::shared_ptr<google::protobuf::Message> *m =
481 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
482 if (!*m)
483 return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
484
485 const Descriptor * desc = (*m)->GetDescriptor();
486 const FieldDescriptor *field = desc->FindFieldByName(field_name);
487 if (!field) {
488 return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
489 }
490 switch (field->label()) {
491 case FieldDescriptor::LABEL_OPTIONAL: return CLIPS::Value("OPTIONAL", CLIPS::TYPE_SYMBOL);
492 case FieldDescriptor::LABEL_REQUIRED: return CLIPS::Value("REQUIRED", CLIPS::TYPE_SYMBOL);
493 case FieldDescriptor::LABEL_REPEATED: return CLIPS::Value("REPEATED", CLIPS::TYPE_SYMBOL);
494 default: return CLIPS::Value("UNKNOWN", CLIPS::TYPE_SYMBOL);
495 }
496}
497
498CLIPS::Value
499ClipsProtobufCommunicator::clips_pb_field_value(void *msgptr, std::string field_name)
500{
501 std::shared_ptr<google::protobuf::Message> *m =
502 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
503 if (!(m && *m)) {
504 if (logger_) {
505 logger_->log_warn("CLIPS-Protobuf", "Invalid message when setting %s", field_name.c_str());
506 }
507 return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
508 }
509
510 const Descriptor * desc = (*m)->GetDescriptor();
511 const FieldDescriptor *field = desc->FindFieldByName(field_name);
512 if (!field) {
513 if (logger_) {
514 logger_->log_warn("CLIPS-Protobuf",
515 "Field %s of %s does not exist",
516 field_name.c_str(),
517 (*m)->GetTypeName().c_str());
518 }
519 return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
520 }
521 const Reflection *refl = (*m)->GetReflection();
522 if (field->type() != FieldDescriptor::TYPE_MESSAGE && !refl->HasField(**m, field)) {
523 if (logger_) {
524 logger_->log_warn("CLIPS-Protobuf",
525 "Field %s of %s not set",
526 field_name.c_str(),
527 (*m)->GetTypeName().c_str());
528 }
529 return CLIPS::Value("NOT-SET", CLIPS::TYPE_SYMBOL);
530 }
531 switch (field->type()) {
532 case FieldDescriptor::TYPE_DOUBLE: return CLIPS::Value(refl->GetDouble(**m, field));
533 case FieldDescriptor::TYPE_FLOAT: return CLIPS::Value(refl->GetFloat(**m, field));
534 case FieldDescriptor::TYPE_INT64: return CLIPS::Value(refl->GetInt64(**m, field));
535 case FieldDescriptor::TYPE_UINT64: return CLIPS::Value((long int)refl->GetUInt64(**m, field));
536 case FieldDescriptor::TYPE_INT32: return CLIPS::Value(refl->GetInt32(**m, field));
537 case FieldDescriptor::TYPE_FIXED64: return CLIPS::Value((long int)refl->GetUInt64(**m, field));
538 case FieldDescriptor::TYPE_FIXED32: return CLIPS::Value(refl->GetUInt32(**m, field));
539 case FieldDescriptor::TYPE_BOOL:
540 //Booleans are represented as Symbols in CLIPS
541 if (refl->GetBool(**m, field)) {
542 return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
543 } else {
544 return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
545 }
546 case FieldDescriptor::TYPE_STRING: return CLIPS::Value(refl->GetString(**m, field));
547 case FieldDescriptor::TYPE_MESSAGE: {
548 const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
549 google::protobuf::Message * mcopy = mfield.New();
550 mcopy->CopyFrom(mfield);
551 void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
552 return CLIPS::Value(ptr);
553 }
554 case FieldDescriptor::TYPE_BYTES: return CLIPS::Value((char *)"bytes");
555 case FieldDescriptor::TYPE_UINT32: return CLIPS::Value(refl->GetUInt32(**m, field));
556 case FieldDescriptor::TYPE_ENUM:
557 return CLIPS::Value(refl->GetEnum(**m, field)->name(), CLIPS::TYPE_SYMBOL);
558 case FieldDescriptor::TYPE_SFIXED32: return CLIPS::Value(refl->GetInt32(**m, field));
559 case FieldDescriptor::TYPE_SFIXED64: return CLIPS::Value(refl->GetInt64(**m, field));
560 case FieldDescriptor::TYPE_SINT32: return CLIPS::Value(refl->GetInt32(**m, field));
561 case FieldDescriptor::TYPE_SINT64: return CLIPS::Value(refl->GetInt64(**m, field));
562 default: throw std::logic_error("Unknown protobuf field type encountered");
563 }
564}
565
566void
567ClipsProtobufCommunicator::clips_pb_set_field(void * msgptr,
568 std::string field_name,
569 CLIPS::Value value)
570{
571 std::shared_ptr<google::protobuf::Message> *m =
572 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
573 if (!(m && *m))
574 return;
575
576 const Descriptor * desc = (*m)->GetDescriptor();
577 const FieldDescriptor *field = desc->FindFieldByName(field_name);
578 if (!field) {
579 if (logger_) {
580 logger_->log_warn("CLIPS-Protobuf", "Could not find field %s", field_name.c_str());
581 }
582 return;
583 }
584 const Reflection *refl = (*m)->GetReflection();
585
586 try {
587 switch (field->type()) {
588 case FieldDescriptor::TYPE_DOUBLE: refl->SetDouble(m->get(), field, value.as_float()); break;
589 case FieldDescriptor::TYPE_FLOAT: refl->SetFloat(m->get(), field, value.as_float()); break;
590 case FieldDescriptor::TYPE_SFIXED64:
591 case FieldDescriptor::TYPE_SINT64:
592 case FieldDescriptor::TYPE_INT64: refl->SetInt64(m->get(), field, value.as_integer()); break;
593 case FieldDescriptor::TYPE_FIXED64:
594 case FieldDescriptor::TYPE_UINT64: refl->SetUInt64(m->get(), field, value.as_integer()); break;
595 case FieldDescriptor::TYPE_SFIXED32:
596 case FieldDescriptor::TYPE_SINT32:
597 case FieldDescriptor::TYPE_INT32: refl->SetInt32(m->get(), field, value.as_integer()); break;
598 case FieldDescriptor::TYPE_BOOL: refl->SetBool(m->get(), field, (value == "TRUE")); break;
599 case FieldDescriptor::TYPE_STRING: refl->SetString(m->get(), field, value.as_string()); break;
600 case FieldDescriptor::TYPE_MESSAGE: {
601 std::shared_ptr<google::protobuf::Message> *mfrom =
602 static_cast<std::shared_ptr<google::protobuf::Message> *>(value.as_address());
603 Message *mut_msg = refl->MutableMessage(m->get(), field);
604 mut_msg->CopyFrom(**mfrom);
605 delete mfrom;
606 } break;
607 case FieldDescriptor::TYPE_BYTES: break;
608 case FieldDescriptor::TYPE_FIXED32:
609 case FieldDescriptor::TYPE_UINT32: refl->SetUInt32(m->get(), field, value.as_integer()); break;
610 case FieldDescriptor::TYPE_ENUM: {
611 const EnumDescriptor * enumdesc = field->enum_type();
612 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
613 if (enumval) {
614 refl->SetEnum(m->get(), field, enumval);
615 } else {
616 if (logger_) {
617 logger_->log_warn("CLIPS-Protobuf",
618 "%s: cannot set invalid "
619 "enum value '%s' on '%s'",
620 (*m)->GetTypeName().c_str(),
621 value.as_string().c_str(),
622 field_name.c_str());
623 }
624 }
625 } break;
626 default: throw std::logic_error("Unknown protobuf field type encountered");
627 }
628 } catch (std::logic_error &e) {
629 if (logger_) {
630 logger_->log_warn("CLIPS-Protobuf",
631 "Failed to set field %s of %s: %s "
632 "(type %d, as string %s)",
633 field_name.c_str(),
634 (*m)->GetTypeName().c_str(),
635 e.what(),
636 value.type(),
637 to_string(value).c_str());
638 }
639 }
640}
641
642void
643ClipsProtobufCommunicator::clips_pb_add_list(void * msgptr,
644 std::string field_name,
645 CLIPS::Value value)
646{
647 std::shared_ptr<google::protobuf::Message> *m =
648 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
649 if (!(m && *m))
650 return;
651
652 const Descriptor * desc = (*m)->GetDescriptor();
653 const FieldDescriptor *field = desc->FindFieldByName(field_name);
654 if (!field) {
655 if (logger_) {
656 logger_->log_warn("CLIPS-Protobuf", "Could not find field %s", field_name.c_str());
657 }
658 return;
659 }
660 const Reflection *refl = (*m)->GetReflection();
661
662 try {
663 switch (field->type()) {
664 case FieldDescriptor::TYPE_DOUBLE: refl->AddDouble(m->get(), field, value); break;
665 case FieldDescriptor::TYPE_FLOAT: refl->AddFloat(m->get(), field, value); break;
666 case FieldDescriptor::TYPE_SFIXED64:
667 case FieldDescriptor::TYPE_SINT64:
668 case FieldDescriptor::TYPE_INT64: refl->AddInt64(m->get(), field, value); break;
669 case FieldDescriptor::TYPE_FIXED64:
670 case FieldDescriptor::TYPE_UINT64: refl->AddUInt64(m->get(), field, (long int)value); break;
671 case FieldDescriptor::TYPE_SFIXED32:
672 case FieldDescriptor::TYPE_SINT32:
673 case FieldDescriptor::TYPE_INT32: refl->AddInt32(m->get(), field, value); break;
674 case FieldDescriptor::TYPE_BOOL: refl->AddBool(m->get(), field, (value == "TRUE")); break;
675 case FieldDescriptor::TYPE_STRING: refl->AddString(m->get(), field, value); break;
676 case FieldDescriptor::TYPE_MESSAGE: {
677 std::shared_ptr<google::protobuf::Message> *mfrom =
678 static_cast<std::shared_ptr<google::protobuf::Message> *>(value.as_address());
679 Message *new_msg = refl->AddMessage(m->get(), field);
680 new_msg->CopyFrom(**mfrom);
681 delete mfrom;
682 } break;
683 case FieldDescriptor::TYPE_BYTES: break;
684 case FieldDescriptor::TYPE_FIXED32:
685 case FieldDescriptor::TYPE_UINT32: refl->AddUInt32(m->get(), field, value); break;
686 case FieldDescriptor::TYPE_ENUM: {
687 const EnumDescriptor * enumdesc = field->enum_type();
688 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
689 if (enumval)
690 refl->AddEnum(m->get(), field, enumval);
691 } break;
692 default: throw std::logic_error("Unknown protobuf field type encountered");
693 }
694 } catch (std::logic_error &e) {
695 if (logger_) {
696 logger_->log_warn("CLIPS-Protobuf",
697 "Failed to add field %s of %s: %s",
698 field_name.c_str(),
699 (*m)->GetTypeName().c_str(),
700 e.what());
701 }
702 }
703}
704
705long int
706ClipsProtobufCommunicator::clips_pb_client_connect(std::string host, int port)
707{
708 if (port <= 0)
709 return false;
710
711 ProtobufStreamClient *client = new ProtobufStreamClient(message_register_);
712
713 long int client_id;
714 {
715 fawkes::MutexLocker lock(&map_mutex_);
716 client_id = ++next_client_id_;
717 clients_[client_id] = client;
718 }
719
720 client->signal_connected().connect(
721 boost::bind(&ClipsProtobufCommunicator::handle_client_connected, this, client_id));
722 client->signal_disconnected().connect(
723 boost::bind(&ClipsProtobufCommunicator::handle_client_disconnected,
724 this,
725 client_id,
726 boost::asio::placeholders::error));
727 client->signal_received().connect(
728 boost::bind(&ClipsProtobufCommunicator::handle_client_msg, this, client_id, _1, _2, _3));
729 client->signal_receive_failed().connect(boost::bind(
730 &ClipsProtobufCommunicator::handle_client_receive_fail, this, client_id, _1, _2, _3));
731
732 client->async_connect(host.c_str(), port);
733 return CLIPS::Value(client_id);
734}
735
736void
737ClipsProtobufCommunicator::clips_pb_send(long int client_id, void *msgptr)
738{
739 std::shared_ptr<google::protobuf::Message> *m =
740 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
741 if (!(m && *m)) {
742 if (logger_) {
743 logger_->log_warn("CLIPS-Protobuf", "Cannot send to %li: invalid message", client_id);
744 }
745 return;
746 }
747
748 try {
749 fawkes::MutexLocker lock(&map_mutex_);
750
751 if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
752 //printf("***** SENDING via SERVER\n");
753 server_->send(server_clients_[client_id], *m);
754 sig_server_sent_(server_clients_[client_id], *m);
755 } else if (clients_.find(client_id) != clients_.end()) {
756 //printf("***** SENDING via CLIENT\n");
757 clients_[client_id]->send(*m);
758 std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
759 sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
760 } else if (peers_.find(client_id) != peers_.end()) {
761 //printf("***** SENDING via CLIENT\n");
762 peers_[client_id]->send(*m);
763 sig_peer_sent_(client_id, *m);
764 } else {
765 //printf("Client ID %li is unknown, cannot send message of type %s\n",
766 // client_id, (*m)->GetTypeName().c_str());
767 }
768 } catch (google::protobuf::FatalException &e) {
769 if (logger_) {
770 logger_->log_warn("CLIPS-Profobuf",
771 "Failed to send message of type %s: %s",
772 (*m)->GetTypeName().c_str(),
773 e.what());
774 }
775 } catch (fawkes::Exception &e) {
776 if (logger_) {
777 logger_->log_warn("CLIPS-Protobuf",
778 "Failed to send message of type %s: %s",
779 (*m)->GetTypeName().c_str(),
781 }
782 } catch (std::runtime_error &e) {
783 if (logger_) {
784 logger_->log_warn("CLIPS-Protobuf",
785 "Failed to send message of type %s: %s",
786 (*m)->GetTypeName().c_str(),
787 e.what());
788 }
789 }
790}
791
792std::string
793ClipsProtobufCommunicator::clips_pb_tostring(void *msgptr)
794{
795 std::shared_ptr<google::protobuf::Message> *m =
796 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
797 if (!(m && *m)) {
798 if (logger_) {
799 logger_->log_warn("CLIPS-Protobuf", "Cannot convert message to string: invalid message");
800 }
801 return "";
802 }
803
804 return (*m)->DebugString();
805}
806
807void
808ClipsProtobufCommunicator::clips_pb_broadcast(long int peer_id, void *msgptr)
809{
810 std::shared_ptr<google::protobuf::Message> *m =
811 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
812 if (!(m && *m)) {
813 if (logger_) {
814 logger_->log_warn("CLIPS-Protobuf", "Cannot send broadcast: invalid message");
815 }
816 return;
817 }
818
819 fawkes::MutexLocker lock(&map_mutex_);
820 if (peers_.find(peer_id) == peers_.end())
821 return;
822
823 //logger_->log_info("CLIPS-Protobuf", "Broadcasting %s", (*m)->GetTypeName().c_str());
824 try {
825 peers_[peer_id]->send(*m);
826 } catch (google::protobuf::FatalException &e) {
827 if (logger_) {
828 logger_->log_warn("CLIPS-Protobuf",
829 "Failed to broadcast message of type %s: %s",
830 (*m)->GetTypeName().c_str(),
831 e.what());
832 }
833 } catch (fawkes::Exception &e) {
834 if (logger_) {
835 logger_->log_warn("CLIPS-Protobuf",
836 "Failed to broadcast message of type %s: %s",
837 (*m)->GetTypeName().c_str(),
839 }
840 } catch (std::runtime_error &e) {
841 if (logger_) {
842 logger_->log_warn("CLIPS-Protobuf",
843 "Failed to broadcast message of type %s: %s",
844 (*m)->GetTypeName().c_str(),
845 e.what());
846 }
847 }
848
849 sig_peer_sent_(peer_id, *m);
850}
851
852void
853ClipsProtobufCommunicator::clips_pb_disconnect(long int client_id)
854{
855 //logger_->log_info("CLIPS-Protobuf", "Disconnecting client %li", client_id);
856
857 try {
858 fawkes::MutexLocker lock(&map_mutex_);
859
860 if (server_clients_.find(client_id) != server_clients_.end()) {
861 protobuf_comm::ProtobufStreamServer::ClientID srv_client = server_clients_[client_id];
862 server_->disconnect(srv_client);
863 server_clients_.erase(client_id);
864 rev_server_clients_.erase(srv_client);
865 } else if (clients_.find(client_id) != clients_.end()) {
866 delete clients_[client_id];
867 clients_.erase(client_id);
868 }
869 } catch (std::runtime_error &e) {
870 if (logger_) {
871 logger_->log_warn("CLIPS-Protobuf",
872 "Failed to disconnect from client %li: %s",
873 client_id,
874 e.what());
875 }
876 }
877}
878
879CLIPS::Values
880ClipsProtobufCommunicator::clips_pb_field_list(void *msgptr, std::string field_name)
881{
882 std::shared_ptr<google::protobuf::Message> *m =
883 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
884 if (!(m && *m))
885 return CLIPS::Values(1, CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL));
886
887 const Descriptor * desc = (*m)->GetDescriptor();
888 const FieldDescriptor *field = desc->FindFieldByName(field_name);
889 if (!field) {
890 return CLIPS::Values(1, CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL));
891 }
892 if (field->label() == FieldDescriptor::LABEL_REQUIRED
893 || field->label() == FieldDescriptor::LABEL_OPTIONAL) {
894 CLIPS::Values rv(1, clips_pb_field_value(msgptr, field_name));
895 return rv;
896 }
897
898 const Reflection *refl = (*m)->GetReflection();
899 int field_size = refl->FieldSize(**m, field);
900 CLIPS::Values rv(field_size);
901 for (int i = 0; i < field_size; ++i) {
902 switch (field->type()) {
903 case FieldDescriptor::TYPE_DOUBLE:
904 rv[i] = CLIPS::Value(refl->GetRepeatedDouble(**m, field, i));
905 break;
906 case FieldDescriptor::TYPE_FLOAT:
907 rv[i] = CLIPS::Value(refl->GetRepeatedFloat(**m, field, i));
908 break;
909 break;
910 case FieldDescriptor::TYPE_UINT64:
911 case FieldDescriptor::TYPE_FIXED64:
912 rv[i] = CLIPS::Value((long int)refl->GetRepeatedUInt64(**m, field, i));
913 break;
914 case FieldDescriptor::TYPE_UINT32:
915 case FieldDescriptor::TYPE_FIXED32:
916 rv[i] = CLIPS::Value(refl->GetRepeatedUInt32(**m, field, i));
917 break;
918 case FieldDescriptor::TYPE_BOOL:
919 //Booleans are represented as Symbols in CLIPS
920 if (refl->GetRepeatedBool(**m, field, i)) {
921 rv[i] = CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
922 } else {
923 rv[i] = CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
924 }
925 break;
926 case FieldDescriptor::TYPE_STRING:
927 rv[i] = CLIPS::Value(refl->GetRepeatedString(**m, field, i));
928 break;
929 case FieldDescriptor::TYPE_MESSAGE: {
930 const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
931 google::protobuf::Message * mcopy = msg.New();
932 mcopy->CopyFrom(msg);
933 void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
934 rv[i] = CLIPS::Value(ptr);
935 } break;
936 case FieldDescriptor::TYPE_BYTES:
937 rv[i] = CLIPS::Value((char *)"BYTES", CLIPS::TYPE_SYMBOL);
938 break;
939 case FieldDescriptor::TYPE_ENUM:
940 rv[i] = CLIPS::Value(refl->GetRepeatedEnum(**m, field, i)->name(), CLIPS::TYPE_SYMBOL);
941 break;
942 case FieldDescriptor::TYPE_SFIXED32:
943 case FieldDescriptor::TYPE_INT32:
944 case FieldDescriptor::TYPE_SINT32:
945 rv[i] = CLIPS::Value(refl->GetRepeatedInt32(**m, field, i));
946 break;
947 case FieldDescriptor::TYPE_SFIXED64:
948 case FieldDescriptor::TYPE_SINT64:
949 case FieldDescriptor::TYPE_INT64:
950 rv[i] = CLIPS::Value(refl->GetRepeatedInt64(**m, field, i));
951 break;
952 default: throw std::logic_error("Unknown protobuf field type encountered");
953 }
954 }
955
956 return rv;
957}
958
959CLIPS::Value
960ClipsProtobufCommunicator::clips_pb_field_is_list(void *msgptr, std::string field_name)
961{
962 std::shared_ptr<google::protobuf::Message> *m =
963 static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
964 if (!(m && *m))
965 return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
966
967 const Descriptor * desc = (*m)->GetDescriptor();
968 const FieldDescriptor *field = desc->FindFieldByName(field_name);
969 if (!field)
970 return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
971 return CLIPS::Value(field->is_repeated() ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
972}
973
974void
975ClipsProtobufCommunicator::clips_assert_message(std::pair<std::string, unsigned short> &endpoint,
976 uint16_t comp_id,
977 uint16_t msg_type,
978 std::shared_ptr<google::protobuf::Message> &msg,
979 ClipsProtobufCommunicator::ClientType ct,
980 long int client_id)
981{
982 CLIPS::Template::pointer temp = clips_->get_template("protobuf-msg");
983 if (temp) {
984 struct timeval tv;
985 gettimeofday(&tv, 0);
986 void * ptr = new std::shared_ptr<google::protobuf::Message>(msg);
987 CLIPS::Fact::pointer fact = CLIPS::Fact::create(*clips_, temp);
988 fact->set_slot("type", msg->GetTypeName());
989 fact->set_slot("comp-id", comp_id);
990 fact->set_slot("msg-type", msg_type);
991 fact->set_slot("rcvd-via",
992 CLIPS::Value((ct == CT_PEER) ? "BROADCAST" : "STREAM", CLIPS::TYPE_SYMBOL));
993 CLIPS::Values rcvd_at(2, CLIPS::Value(CLIPS::TYPE_INTEGER));
994 rcvd_at[0] = tv.tv_sec;
995 rcvd_at[1] = tv.tv_usec;
996 fact->set_slot("rcvd-at", rcvd_at);
997 CLIPS::Values host_port(2, CLIPS::Value(CLIPS::TYPE_STRING));
998 host_port[0] = endpoint.first;
999 host_port[1] = CLIPS::Value(endpoint.second);
1000 fact->set_slot("rcvd-from", host_port);
1001 fact->set_slot("client-type",
1002 CLIPS::Value(ct == CT_CLIENT ? "CLIENT" : (ct == CT_SERVER ? "SERVER" : "PEER"),
1003 CLIPS::TYPE_SYMBOL));
1004 fact->set_slot("client-id", client_id);
1005 fact->set_slot("ptr", CLIPS::Value(ptr));
1006 CLIPS::Fact::pointer new_fact = clips_->assert_fact(fact);
1007
1008 if (!new_fact) {
1009 if (logger_) {
1010 logger_->log_warn("CLIPS-Protobuf", "Asserting protobuf-msg fact failed");
1011 }
1012 delete static_cast<std::shared_ptr<google::protobuf::Message> *>(ptr);
1013 }
1014 } else {
1015 if (logger_) {
1016 logger_->log_warn("CLIPS-Protobuf", "Did not get template, did you load protobuf.clp?");
1017 }
1018 }
1019}
1020
1021void
1022ClipsProtobufCommunicator::handle_server_client_connected(ProtobufStreamServer::ClientID client,
1023 boost::asio::ip::tcp::endpoint &endpoint)
1024{
1025 long int client_id = -1;
1026 {
1027 fawkes::MutexLocker lock(&map_mutex_);
1028 client_id = ++next_client_id_;
1029 client_endpoints_[client_id] = std::make_pair(endpoint.address().to_string(), endpoint.port());
1030 server_clients_[client_id] = client;
1031 rev_server_clients_[client] = client_id;
1032 }
1033
1034 fawkes::MutexLocker lock(&clips_mutex_);
1035 clips_->assert_fact_f("(protobuf-server-client-connected %li %s %u)",
1036 client_id,
1037 endpoint.address().to_string().c_str(),
1038 endpoint.port());
1039}
1040
1041void
1042ClipsProtobufCommunicator::handle_server_client_disconnected(ProtobufStreamServer::ClientID client,
1043 const boost::system::error_code &error)
1044{
1045 long int client_id = -1;
1046 {
1047 fawkes::MutexLocker lock(&map_mutex_);
1048 RevServerClientMap::iterator c;
1049 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1050 client_id = c->second;
1051 rev_server_clients_.erase(c);
1052 server_clients_.erase(client_id);
1053 }
1054 }
1055
1056 if (client_id >= 0) {
1057 fawkes::MutexLocker lock(&clips_mutex_);
1058 clips_->assert_fact_f("(protobuf-server-client-disconnected %li)", client_id);
1059 }
1060}
1061
1062/** Handle message that came from a client.
1063 * @param client client ID
1064 * @param component_id component the message was addressed to
1065 * @param msg_type type of the message
1066 * @param msg the message
1067 */
1068void
1069ClipsProtobufCommunicator::handle_server_client_msg(ProtobufStreamServer::ClientID client,
1070 uint16_t component_id,
1071 uint16_t msg_type,
1072 std::shared_ptr<google::protobuf::Message> msg)
1073{
1074 fawkes::MutexLocker lock(&clips_mutex_);
1075 fawkes::MutexLocker lock2(&map_mutex_);
1076 RevServerClientMap::iterator c;
1077 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1078 clips_assert_message(
1079 client_endpoints_[c->second], component_id, msg_type, msg, CT_SERVER, c->second);
1080 }
1081}
1082
1083/** Handle server reception failure
1084 * @param client client ID
1085 * @param component_id component the message was addressed to
1086 * @param msg_type type of the message
1087 * @param msg the message string
1088 */
1089void
1090ClipsProtobufCommunicator::handle_server_client_fail(ProtobufStreamServer::ClientID client,
1091 uint16_t component_id,
1092 uint16_t msg_type,
1093 std::string msg)
1094{
1095 fawkes::MutexLocker lock(&map_mutex_);
1096 RevServerClientMap::iterator c;
1097 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1098 fawkes::MutexLocker lock(&clips_mutex_);
1099 clips_->assert_fact_f("(protobuf-server-receive-failed (comp-id %u) (msg-type %u) "
1100 "(rcvd-via STREAM) (client-id %li) (message \"%s\") "
1101 "(rcvd-from (\"%s\" %u)))",
1102 component_id,
1103 msg_type,
1104 c->second,
1105 msg.c_str(),
1106 client_endpoints_[c->second].first.c_str(),
1107 client_endpoints_[c->second].second);
1108 }
1109}
1110
1111/** Handle message that came from a peer/robot
1112 * @param endpoint the endpoint from which the message was received
1113 * @param component_id component the message was addressed to
1114 * @param msg_type type of the message
1115 * @param msg the message
1116 */
1117void
1118ClipsProtobufCommunicator::handle_peer_msg(long int peer_id,
1119 boost::asio::ip::udp::endpoint & endpoint,
1120 uint16_t component_id,
1121 uint16_t msg_type,
1122 std::shared_ptr<google::protobuf::Message> msg)
1123{
1124 fawkes::MutexLocker lock(&clips_mutex_);
1125 std::pair<std::string, unsigned short> endpp =
1126 std::make_pair(endpoint.address().to_string(), endpoint.port());
1127 clips_assert_message(endpp, component_id, msg_type, msg, CT_PEER, peer_id);
1128}
1129
1130/** Handle error during peer message processing.
1131 * @param endpoint endpoint of incoming message
1132 * @param msg error message
1133 */
1134void
1135ClipsProtobufCommunicator::handle_peer_recv_error(long int peer_id,
1136 boost::asio::ip::udp::endpoint &endpoint,
1137 std::string msg)
1138{
1139 if (logger_) {
1140 logger_->log_warn("CLIPS-Protobuf",
1141 "Failed to receive peer message from %s:%u: %s",
1142 endpoint.address().to_string().c_str(),
1143 endpoint.port(),
1144 msg.c_str());
1145 }
1146}
1147
1148/** Handle error during peer message processing.
1149 * @param msg error message
1150 */
1151void
1152ClipsProtobufCommunicator::handle_peer_send_error(long int peer_id, std::string msg)
1153{
1154 if (logger_) {
1155 logger_->log_warn("CLIPS-Protobuf", "Failed to send peer message: %s", msg.c_str());
1156 }
1157}
1158
1159void
1160ClipsProtobufCommunicator::handle_client_connected(long int client_id)
1161{
1162 fawkes::MutexLocker lock(&clips_mutex_);
1163 clips_->assert_fact_f("(protobuf-client-connected %li)", client_id);
1164}
1165
1166void
1167ClipsProtobufCommunicator::handle_client_disconnected(long int client_id,
1168 const boost::system::error_code &error)
1169{
1170 fawkes::MutexLocker lock(&clips_mutex_);
1171 clips_->assert_fact_f("(protobuf-client-disconnected %li)", client_id);
1172}
1173
1174void
1175ClipsProtobufCommunicator::handle_client_msg(long int client_id,
1176 uint16_t comp_id,
1177 uint16_t msg_type,
1178 std::shared_ptr<google::protobuf::Message> msg)
1179{
1180 fawkes::MutexLocker lock(&clips_mutex_);
1181 std::pair<std::string, unsigned short> endpp = std::make_pair(std::string(), 0);
1182 clips_assert_message(endpp, comp_id, msg_type, msg, CT_CLIENT, client_id);
1183}
1184
1185void
1186ClipsProtobufCommunicator::handle_client_receive_fail(long int client_id,
1187 uint16_t comp_id,
1188 uint16_t msg_type,
1189 std::string msg)
1190{
1191 fawkes::MutexLocker lock(&clips_mutex_);
1192 clips_->assert_fact_f("(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) "
1193 "(comp-id %u) (msg-type %u) (message \"%s\"))",
1194 client_id,
1195 comp_id,
1196 msg_type,
1197 msg.c_str());
1198}
1199
1200std::string
1201ClipsProtobufCommunicator::to_string(const CLIPS::Value &v)
1202{
1203 switch (v.type()) {
1204 case CLIPS::TYPE_UNKNOWN: return "Unknown Type";
1205 case CLIPS::TYPE_FLOAT: return std::to_string(v.as_float());
1206 case CLIPS::TYPE_INTEGER: return std::to_string(v.as_integer());
1207 case CLIPS::TYPE_SYMBOL:
1208 case CLIPS::TYPE_INSTANCE_NAME:
1209 case CLIPS::TYPE_STRING: return v.as_string();
1210 case CLIPS::TYPE_INSTANCE_ADDRESS:
1211 case CLIPS::TYPE_EXTERNAL_ADDRESS: return boost::str(boost::format("%p") % v.as_address());
1212 }
1213 return "Implicit unknown type";
1214}
1215
1216} // end namespace protobuf_clips
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual const char * what_no_backtrace() const noexcept
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:663
Interface for logging.
Definition: logger.h:42
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void enable_server(int port)
Enable protobuf stream server.
ClipsProtobufCommunicator(CLIPS::Environment *env, fawkes::Mutex &env_mutex, fawkes::Logger *logger=NULL)
Constructor.
void disable_server()
Disable protobu stream server.