Gazebo Transport

API Reference

13.1.0
Discovery.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2014 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18#ifndef GZ_TRANSPORT_DISCOVERY_HH_
19#define GZ_TRANSPORT_DISCOVERY_HH_
20#include <errno.h>
21#include <string.h>
22
23#ifdef _WIN32
24 // For socket(), connect(), send(), and recv().
25 #include <Winsock2.h>
26 #include <Ws2def.h>
27 #include <Ws2ipdef.h>
28 #include <Ws2tcpip.h>
29 // Type used for raw data on this platform.
30 using raw_type = char;
31#else
32 // For data types
33 #include <sys/types.h>
34 // For socket(), connect(), send(), and recv()
35 #include <sys/socket.h>
36 // For gethostbyname()
37 #include <netdb.h>
38 // For inet_addr()
39 #include <arpa/inet.h>
40 // For close()
41 #include <unistd.h>
42 // For sockaddr_in
43 #include <netinet/in.h>
44 // Type used for raw data on this platform
45 using raw_type = void;
46#endif
47
48#ifdef _WIN32
49 #pragma warning(push, 0)
50#endif
51#ifdef _WIN32
52 #pragma warning(pop)
53 // Suppress "decorated name length exceed" warning in STL.
54 #pragma warning(disable: 4503)
55 // Suppress "depreted API warnings" in WINSOCK.
56 #pragma warning(disable: 4996)
57#endif
58
59#include <gz/msgs/discovery.pb.h>
60
61#include <algorithm>
62#include <condition_variable>
63#include <limits>
64#include <map>
65#include <memory>
66#include <mutex>
67#include <string>
68#include <thread>
69#include <vector>
70
71#include <gz/msgs/Utility.hh>
72
73#include "gz/transport/config.hh"
74#include "gz/transport/Export.hh"
80
81namespace gz
82{
83 namespace transport
84 {
85 // Inline bracket to help doxygen filtering.
86 inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
88 enum class DestinationType
89 {
91 UNICAST,
95 ALL
96 };
97
98 //
104 bool GZ_TRANSPORT_VISIBLE pollSockets(
105 const std::vector<int> &_sockets,
106 const int _timeout);
107
116 template<typename Pub>
118 {
125 public: Discovery(const std::string &_pUuid,
126 const std::string &_ip,
127 const int _port,
128 const bool _verbose = false)
129 : multicastGroup(_ip),
130 port(_port),
131 hostAddr(determineHost()),
132 pUuid(_pUuid),
133 silenceInterval(kDefSilenceInterval),
134 activityInterval(kDefActivityInterval),
135 heartbeatInterval(kDefHeartbeatInterval),
136 connectionCb(nullptr),
137 disconnectionCb(nullptr),
138 verbose(_verbose),
139 initialized(false),
140 numHeartbeatsUninitialized(0),
141 exit(false),
142 enabled(false)
143 {
144 std::string gzIp;
145 if (env("GZ_IP", gzIp) && !gzIp.empty())
146 {
147 this->hostInterfaces = {gzIp};
148 }
149 else
150 {
151 // Get the list of network interfaces in this host.
152 this->hostInterfaces = determineInterfaces();
153 }
154
155#ifdef _WIN32
156 WORD wVersionRequested;
157 WSADATA wsaData;
158
159 // Request WinSock v2.2.
160 wVersionRequested = MAKEWORD(2, 2);
161 // Load WinSock DLL.
162 if (WSAStartup(wVersionRequested, &wsaData) != 0)
163 {
164 std::cerr << "Unable to load WinSock DLL" << std::endl;
165 return;
166 }
167#endif
168 for (const auto &netIface : this->hostInterfaces)
169 {
170 auto succeed = this->RegisterNetIface(netIface);
171
172 // If the IP address that we're selecting as the main IP address of
173 // the host is invalid, we change it to 127.0.0.1 .
174 // This is probably because IGN_IP is set to a wrong value.
175 if (netIface == this->hostAddr && !succeed)
176 {
177 this->RegisterNetIface("127.0.0.1");
178 std::cerr << "Did you set the environment variable IGN_IP with a "
179 << "correct IP address? " << std::endl
180 << " [" << netIface << "] seems an invalid local IP "
181 << "address." << std::endl
182 << " Using 127.0.0.1 as hostname." << std::endl;
183 this->hostAddr = "127.0.0.1";
184 }
185 }
186
187 // Socket option: SO_REUSEADDR. This options is used only for receiving
188 // data. We can reuse the same socket for receiving multicast data from
189 // multiple interfaces. We will use the socket at position 0 for
190 // receiving data.
191 int reuseAddr = 1;
192 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
193 reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
194 {
195 std::cerr << "Error setting socket option (SO_REUSEADDR)."
196 << std::endl;
197 return;
198 }
199
200#ifdef SO_REUSEPORT
201 // Socket option: SO_REUSEPORT. This options is used only for receiving
202 // data. We can reuse the same socket for receiving multicast data from
203 // multiple interfaces. We will use the socket at position 0 for
204 // receiving data.
205 int reusePort = 1;
206 // cppcheck-suppress ConfigurationNotChecked
207 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
208 reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
209 {
210 std::cerr << "Error setting socket option (SO_REUSEPORT)."
211 << std::endl;
212 return;
213 }
214#endif
215 // Bind the first socket to the discovery port.
216 sockaddr_in localAddr;
217 memset(&localAddr, 0, sizeof(localAddr));
218 localAddr.sin_family = AF_INET;
219 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
220 localAddr.sin_port = htons(static_cast<u_short>(this->port));
221
222 if (bind(this->sockets.at(0),
223 reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
224 {
225 std::cerr << "Binding to a local port failed." << std::endl;
226 return;
227 }
228
229 // Set 'mcastAddr' to the multicast discovery group.
230 memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
231 this->mcastAddr.sin_family = AF_INET;
232 this->mcastAddr.sin_addr.s_addr =
233 inet_addr(this->multicastGroup.c_str());
234 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
235
237 std::string gzRelay = "";
238 if (env("GZ_RELAY", gzRelay) && !gzRelay.empty())
239 {
240 relays = transport::split(gzRelay, ':');
241 }
242
243 // Register all unicast relays.
244 for (auto const &relayAddr : relays)
245 this->AddRelayAddress(relayAddr);
246
247 if (this->verbose)
248 this->PrintCurrentState();
249 }
250
252 public: virtual ~Discovery()
253 {
254 // Tell the service thread to terminate.
255 this->exitMutex.lock();
256 this->exit = true;
257 this->exitMutex.unlock();
258
259 // Wait for the service threads to finish before exit.
260 if (this->threadReception.joinable())
261 this->threadReception.join();
262
263 // Broadcast a BYE message to trigger the remote cancellation of
264 // all our advertised topics.
265 this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
266 Publisher("", "", this->pUuid, "", AdvertiseOptions()));
267
268 // Close sockets.
269 for (const auto &sock : this->sockets)
270 {
271#ifdef _WIN32
272 closesocket(sock);
273 WSACleanup();
274#else
275 close(sock);
276#endif
277 }
278 }
279
283 public: void Start()
284 {
285 {
286 std::lock_guard<std::mutex> lock(this->mutex);
287
288 // The service is already running.
289 if (this->enabled)
290 return;
291
292 this->enabled = true;
293 }
294
296 this->timeNextHeartbeat = now;
297 this->timeNextActivity = now;
298
299 // Start the thread that receives discovery information.
300 this->threadReception = std::thread(&Discovery::RecvMessages, this);
301 }
302
307 public: bool Advertise(const Pub &_publisher)
308 {
310
311 {
312 std::lock_guard<std::mutex> lock(this->mutex);
313
314 if (!this->enabled)
315 return false;
316
317 // Add the addressing information (local publisher).
318 if (!this->info.AddPublisher(_publisher))
319 return false;
320
321 cb = this->connectionCb;
322 }
323
324 if (cb)
325 cb(_publisher);
326
327 // Only advertise a message outside this process if the scope
328 // is not 'Process'
329 if (_publisher.Options().Scope() != Scope_t::PROCESS)
330 this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
331 _publisher);
332
333 return true;
334 }
335
346 public: bool Discover(const std::string &_topic) const
347 {
349 bool found;
350 Addresses_M<Pub> addresses;
351
352 {
353 std::lock_guard<std::mutex> lock(this->mutex);
354
355 if (!this->enabled)
356 return false;
357
358 cb = this->connectionCb;
359 }
360
361 Pub pub;
362 pub.SetTopic(_topic);
363 pub.SetPUuid(this->pUuid);
364
365 // Send a discovery request.
366 this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
367
368 {
369 std::lock_guard<std::mutex> lock(this->mutex);
370 found = this->info.Publishers(_topic, addresses);
371 }
372
373 if (found)
374 {
375 // I already have information about this topic.
376 for (const auto &proc : addresses)
377 {
378 for (const auto &node : proc.second)
379 {
380 if (cb)
381 {
382 // Execute the user's callback for a service request. Notice
383 // that we only execute one callback for preventing receive
384 // multiple service responses for a single request.
385 cb(node);
386 }
387 }
388 }
389 }
390
391 return true;
392 }
393
396 public: void SendSubscribersRep(const MessagePublisher &_pub) const
397 {
398 this->SendMsg(
399 DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REP, _pub);
400 }
401
404 public: void Register(const MessagePublisher &_pub) const
405 {
406 this->SendMsg(
407 DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
408 }
409
412 public: void Unregister(const MessagePublisher &_pub) const
413 {
414 this->SendMsg(
415 DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
416 }
417
420 public: const TopicStorage<Pub> &Info() const
421 {
422 std::lock_guard<std::mutex> lock(this->mutex);
423 return this->info;
424 }
425
430 public: bool Publishers(const std::string &_topic,
431 Addresses_M<Pub> &_publishers) const
432 {
433 std::lock_guard<std::mutex> lock(this->mutex);
434 return this->info.Publishers(_topic, _publishers);
435 }
436
441 public: bool RemoteSubscribers(const std::string &_topic,
442 Addresses_M<Pub> &_subscribers) const
443 {
444 std::lock_guard<std::mutex> lock(this->mutex);
445 return this->remoteSubscribers.Publishers(_topic, _subscribers);
446 }
447
455 public: bool Unadvertise(const std::string &_topic,
456 const std::string &_nUuid)
457 {
458 Pub inf;
459 {
460 std::lock_guard<std::mutex> lock(this->mutex);
461
462 if (!this->enabled)
463 return false;
464
465 // Don't do anything if the topic is not advertised by any of my nodes
466 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
467 return true;
468
469 // Remove the topic information.
470 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
471 }
472
473 // Only unadvertise a message outside this process if the scope
474 // is not 'Process'.
475 if (inf.Options().Scope() != Scope_t::PROCESS)
476 {
477 this->SendMsg(DestinationType::ALL,
478 msgs::Discovery::UNADVERTISE, inf);
479 }
480
481 return true;
482 }
483
486 public: std::string HostAddr() const
487 {
488 std::lock_guard<std::mutex> lock(this->mutex);
489 return this->hostAddr;
490 }
491
496 public: unsigned int ActivityInterval() const
497 {
498 std::lock_guard<std::mutex> lock(this->mutex);
499 return this->activityInterval;
500 }
501
507 public: unsigned int HeartbeatInterval() const
508 {
509 std::lock_guard<std::mutex> lock(this->mutex);
510 return this->heartbeatInterval;
511 }
512
517 public: unsigned int SilenceInterval() const
518 {
519 std::lock_guard<std::mutex> lock(this->mutex);
520 return this->silenceInterval;
521 }
522
526 public: void SetActivityInterval(const unsigned int _ms)
527 {
528 std::lock_guard<std::mutex> lock(this->mutex);
529 this->activityInterval = _ms;
530 }
531
535 public: void SetHeartbeatInterval(const unsigned int _ms)
536 {
537 std::lock_guard<std::mutex> lock(this->mutex);
538 this->heartbeatInterval = _ms;
539 }
540
544 public: void SetSilenceInterval(const unsigned int _ms)
545 {
546 std::lock_guard<std::mutex> lock(this->mutex);
547 this->silenceInterval = _ms;
548 }
549
554 public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
555 {
556 std::lock_guard<std::mutex> lock(this->mutex);
557 this->connectionCb = _cb;
558 }
559
565 {
566 std::lock_guard<std::mutex> lock(this->mutex);
567 this->disconnectionCb = _cb;
568 }
569
574 {
575 std::lock_guard<std::mutex> lock(this->mutex);
576 this->registrationCb = _cb;
577 }
578
583 {
584 std::lock_guard<std::mutex> lock(this->mutex);
585 this->unregistrationCb = _cb;
586 }
587
591 public: void SubscribersCb(const std::function<void()> &_cb)
592 {
593 std::lock_guard<std::mutex> lock(this->mutex);
594 this->subscribersCb = _cb;
595 }
596
598 public: void PrintCurrentState() const
599 {
600 std::lock_guard<std::mutex> lock(this->mutex);
601
602 std::cout << "---------------" << std::endl;
603 std::cout << std::boolalpha << "Enabled: "
604 << this->enabled << std::endl;
605 std::cout << "Discovery state" << std::endl;
606 std::cout << "\tUUID: " << this->pUuid << std::endl;
607 std::cout << "Settings" << std::endl;
608 std::cout << "\tActivity: " << this->activityInterval
609 << " ms." << std::endl;
610 std::cout << "\tHeartbeat: " << this->heartbeatInterval
611 << "ms." << std::endl;
612 std::cout << "\tSilence: " << this->silenceInterval
613 << " ms." << std::endl;
614 std::cout << "Known information:" << std::endl;
615 this->info.Print();
616
617 // Used to calculate the elapsed time.
619
620 std::cout << "Activity" << std::endl;
621 if (this->activity.empty())
622 std::cout << "\t<empty>" << std::endl;
623 else
624 {
625 for (auto &proc : this->activity)
626 {
627 // Elapsed time since the last update from this publisher.
628 std::chrono::duration<double> elapsed = now - proc.second;
629
630 std::cout << "\t" << proc.first << std::endl;
631 std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
632 std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
633 << std::endl;
634 }
635 }
636 std::cout << "---------------" << std::endl;
637 }
638
642 public: void TopicList(std::vector<std::string> &_topics)
643 {
644 {
645 std::lock_guard<std::mutex> lock(this->mutex);
646 this->remoteSubscribers.Clear();
647 }
648
649 // Request the list of subscribers.
650 Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
651 this->SendMsg(
652 DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REQ, pub);
653
654 this->WaitForInit();
655 std::lock_guard<std::mutex> lock(this->mutex);
656 this->info.TopicList(_topics);
657
658 std::vector<std::string> remoteSubs;
659 this->remoteSubscribers.TopicList(remoteSubs);
660
661 // Add the remote subscribers
662 for (auto const &t : remoteSubs)
663 {
664 if (std::find(_topics.begin(), _topics.end(), t) == _topics.end())
665 {
666 _topics.push_back(t);
667 }
668 }
669 }
670
673 public: void WaitForInit() const
674 {
675 std::unique_lock<std::mutex> lk(this->mutex);
676
677 if (!this->initialized)
678 {
679 this->initializedCv.wait(lk, [this]{return this->initialized;});
680 }
681 }
682
686 private: void UpdateActivity()
687 {
688 // The UUIDs of the processes that have expired.
690
691 // A copy of the disconnection callback.
692 DiscoveryCallback<Pub> disconnectCb;
693
695
696 {
697 std::lock_guard<std::mutex> lock(this->mutex);
698
699 if (now < this->timeNextActivity)
700 return;
701
702 disconnectCb = this->disconnectionCb;
703
704 for (auto it = this->activity.cbegin(); it != this->activity.cend();)
705 {
706 // Elapsed time since the last update from this publisher.
707 auto elapsed = now - it->second;
708
709 // This publisher has expired.
710 if (std::chrono::duration_cast<std::chrono::milliseconds>
711 (elapsed).count() > this->silenceInterval)
712 {
713 // Remove all the info entries for this process UUID.
714 this->info.DelPublishersByProc(it->first);
715
716 uuids.push_back(it->first);
717
718 // Remove the activity entry.
719 this->activity.erase(it++);
720 }
721 else
722 ++it;
723 }
724
725 this->timeNextActivity = std::chrono::steady_clock::now() +
726 std::chrono::milliseconds(this->activityInterval);
727 }
728
729 if (!disconnectCb)
730 return;
731
732 // Notify without topic information. This is useful to inform the
733 // client that a remote node is gone, even if we were not
734 // interested in its topics.
735 for (auto const &uuid : uuids)
736 {
737 Pub publisher;
738 publisher.SetPUuid(uuid);
739 disconnectCb(publisher);
740 }
741 }
742
744 private: void UpdateHeartbeat()
745 {
747
748 {
750
751 if (now < this->timeNextHeartbeat)
752 return;
753 }
754
755 Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
756 this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
757
759 {
761
762 // Re-advertise topics that are advertised inside this process.
763 this->info.PublishersByProc(this->pUuid, nodes);
764 }
765
766 for (const auto &topic : nodes)
767 {
768 for (const auto &node : topic.second)
769 {
770 this->SendMsg(DestinationType::ALL,
771 msgs::Discovery::ADVERTISE, node);
772 }
773 }
774
775 {
777 if (!this->initialized)
778 {
779 if (this->numHeartbeatsUninitialized == 2u)
780 {
781 // We consider discovery initialized after two heartbeat cycles.
782 this->initialized = true;
783
784 // Notify anyone waiting for the initialization phase to finish.
785 this->initializedCv.notify_all();
786 }
787 ++this->numHeartbeatsUninitialized;
788 }
789
790 this->timeNextHeartbeat = std::chrono::steady_clock::now() +
791 std::chrono::milliseconds(this->heartbeatInterval);
792 }
793 }
794
804 private: int NextTimeout() const
805 {
807 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
808 auto timeUntilNextActivity = this->timeNextActivity - now;
809
810 int t = static_cast<int>(
811 std::chrono::duration_cast<std::chrono::milliseconds>
812 (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
813 int t2 = std::min(t, this->kTimeout);
814 return std::max(t2, 0);
815 }
816
818 private: void RecvMessages()
819 {
820 bool timeToExit = false;
821 while (!timeToExit)
822 {
823 // Calculate the timeout.
824 int timeout = this->NextTimeout();
825
826 if (pollSockets(this->sockets, timeout))
827 {
828 this->RecvDiscoveryUpdate();
829
830 if (this->verbose)
831 this->PrintCurrentState();
832 }
833
834 this->UpdateHeartbeat();
835 this->UpdateActivity();
836
837 // Is it time to exit?
838 {
839 std::lock_guard<std::mutex> lock(this->exitMutex);
840 if (this->exit)
841 timeToExit = true;
842 }
843 }
844 }
845
847 private: void RecvDiscoveryUpdate()
848 {
849 char rcvStr[Discovery::kMaxRcvStr];
850 sockaddr_in clntAddr;
851 socklen_t addrLen = sizeof(clntAddr);
852
853 int64_t received = recvfrom(this->sockets.at(0),
854 reinterpret_cast<raw_type *>(rcvStr),
855 this->kMaxRcvStr, 0,
856 reinterpret_cast<sockaddr *>(&clntAddr),
857 reinterpret_cast<socklen_t *>(&addrLen));
858 if (received > 0)
859 {
860 uint16_t len = 0;
861 memcpy(&len, &rcvStr[0], sizeof(len));
862
863 // Gazebo Transport delimits each discovery message with a
864 // frame_delimiter that contains byte size information.
865 // A discovery message has the form:
866 //
867 // <frame_delimiter><frame_body>
868 //
869 // Gazebo Transport version < 8 sends a frame delimiter that
870 // contains the value of sizeof(frame_delimiter)
871 // + sizeof(frame_body). In other words, the frame_delimiter
872 // contains a value that represents the total size of the
873 // frame_body and frame_delimiter in bytes.
874 //
875 // Gazebo Transport version >= 8 sends a frame_delimiter
876 // that contains the value of sizeof(frame_body). In other
877 // words, the frame_delimiter contains a value that represents
878 // the total size of only the frame_body.
879 //
880 // It is possible that two incompatible versions of Gazebo
881 // Transport exist on the same network. If we receive an
882 // unexpected size, then we ignore the message.
883
884 // If-condition for version 8+
885 if (len + sizeof(len) == static_cast<uint16_t>(received))
886 {
887 std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
888 uint16_t srcPort = ntohs(clntAddr.sin_port);
889
890 if (this->verbose)
891 {
892 std::cout << "\nReceived discovery update from "
893 << srcAddr << ": " << srcPort << std::endl;
894 }
895
896 this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
897 }
898 }
899 else if (received < 0)
900 {
901 std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
902 << std::endl;
903 }
904 }
905
910 private: void DispatchDiscoveryMsg(const std::string &_fromIp,
911 char *_msg, uint16_t _len)
912 {
913 gz::msgs::Discovery msg;
914
915 // Parse the message, and return if parsing failed. Parsing could
916 // fail when another discovery node is publishing messages using an
917 // older (or newer) format.
918 if (!msg.ParseFromArray(_msg, _len))
919 return;
920
921 // Discard the message if the wire protocol is different than mine.
922 if (this->Version() != msg.version())
923 return;
924
925 std::string recvPUuid = msg.process_uuid();
926
927 // Discard our own discovery messages.
928 if (recvPUuid == this->pUuid)
929 return;
930
931 // Forwarding summary:
932 // - From a unicast peer -> to multicast group (with NO_RELAY flag).
933 // - From multicast group -> to unicast peers (with RELAY flag).
934
935 // If the RELAY flag is set, this discovery message is coming via a
936 // unicast transmission. In this case, we don't process it, we just
937 // forward it to the multicast group, and it will be dispatched once
938 // received there. Note that we also unset the RELAY flag and set the
939 // NO_RELAY flag, to avoid forwarding the message anymore.
940 if (msg.has_flags() && msg.flags().relay())
941 {
942 // Unset the RELAY flag in the header and set the NO_RELAY.
943 msg.mutable_flags()->set_relay(false);
944 msg.mutable_flags()->set_no_relay(true);
945 this->SendMulticast(msg);
946
947 // A unicast peer contacted me. I need to save its address for
948 // sending future messages in the future.
949 this->AddRelayAddress(_fromIp);
950 return;
951 }
952 // If we are receiving this discovery message via the multicast channel
953 // and the NO_RELAY flag is not set, we forward this message via unicast
954 // to all our relays. Note that this is the most common case, where we
955 // receive a regular multicast message and we forward it to any remote
956 // relays.
957 else if (!msg.has_flags() || !msg.flags().no_relay())
958 {
959 msg.mutable_flags()->set_relay(true);
960 this->SendUnicast(msg);
961 }
962
963 bool isSenderLocal = (std::find(this->hostInterfaces.begin(),
964 this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
965 (_fromIp.find("127.") == 0);
966
967 // Update timestamp and cache the callbacks.
968 DiscoveryCallback<Pub> connectCb;
969 DiscoveryCallback<Pub> disconnectCb;
970 DiscoveryCallback<Pub> registerCb;
971 DiscoveryCallback<Pub> unregisterCb;
972 std::function<void()> subscribersReqCb;
973 {
975 this->activity[recvPUuid] = std::chrono::steady_clock::now();
976 connectCb = this->connectionCb;
977 disconnectCb = this->disconnectionCb;
978 registerCb = this->registrationCb;
979 unregisterCb = this->unregistrationCb;
980 subscribersReqCb = this->subscribersCb;
981 }
982
983 switch (msg.type())
984 {
985 case msgs::Discovery::ADVERTISE:
986 {
987 // Read the rest of the fields.
988 Pub publisher;
989 publisher.SetFromDiscovery(msg);
990
991 // Check scope of the topic.
992 if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
993 (publisher.Options().Scope() == Scope_t::HOST &&
994 !isSenderLocal))
995 {
996 return;
997 }
998
999 // Register an advertised address for the topic.
1000 bool added;
1001 {
1002 std::lock_guard<std::mutex> lock(this->mutex);
1003 added = this->info.AddPublisher(publisher);
1004 }
1005
1006 if (added && connectCb)
1007 {
1008 // Execute the client's callback.
1009 connectCb(publisher);
1010 }
1011
1012 break;
1013 }
1014 case msgs::Discovery::SUBSCRIBE:
1015 {
1016 std::string recvTopic;
1017 // Read the topic information.
1018 if (msg.has_sub())
1019 {
1020 recvTopic = msg.sub().topic();
1021 }
1022 else
1023 {
1024 std::cerr << "Subscription discovery message is missing "
1025 << "Subscriber information.\n";
1026 break;
1027 }
1028
1029 // Check if at least one of my nodes advertises the topic requested.
1030 Addresses_M<Pub> addresses;
1031 {
1032 std::lock_guard<std::mutex> lock(this->mutex);
1033 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
1034 {
1035 break;
1036 }
1037
1038 if (!this->info.Publishers(recvTopic, addresses))
1039 break;
1040 }
1041
1042 for (const auto &nodeInfo : addresses[this->pUuid])
1043 {
1044 // Check scope of the topic.
1045 if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
1046 (nodeInfo.Options().Scope() == Scope_t::HOST &&
1047 !isSenderLocal))
1048 {
1049 continue;
1050 }
1051
1052 // Answer an ADVERTISE message.
1053 this->SendMsg(DestinationType::ALL,
1054 msgs::Discovery::ADVERTISE, nodeInfo);
1055 }
1056
1057 break;
1058 }
1059 case msgs::Discovery::SUBSCRIBERS_REQ:
1060 {
1061 if (subscribersReqCb)
1062 subscribersReqCb();
1063
1064 break;
1065 }
1066 case msgs::Discovery::SUBSCRIBERS_REP:
1067 {
1068 // Save the remote subscriber.
1069 Pub publisher;
1070 publisher.SetFromDiscovery(msg);
1071
1072 {
1073 std::lock_guard<std::mutex> lock(this->mutex);
1074 this->remoteSubscribers.AddPublisher(publisher);
1075 }
1076 break;
1077 }
1078 case msgs::Discovery::NEW_CONNECTION:
1079 {
1080 // Read the rest of the fields.
1081 Pub publisher;
1082 publisher.SetFromDiscovery(msg);
1083
1084 if (registerCb)
1085 registerCb(publisher);
1086
1087 break;
1088 }
1089 case msgs::Discovery::END_CONNECTION:
1090 {
1091 // Read the rest of the fields.
1092 Pub publisher;
1093 publisher.SetFromDiscovery(msg);
1094
1095 if (unregisterCb)
1096 unregisterCb(publisher);
1097
1098 break;
1099 }
1100 case msgs::Discovery::HEARTBEAT:
1101 {
1102 // The timestamp has already been updated.
1103 break;
1104 }
1105 case msgs::Discovery::BYE:
1106 {
1107 // Remove the activity entry for this publisher.
1108 {
1109 std::lock_guard<std::mutex> lock(this->mutex);
1110 this->activity.erase(recvPUuid);
1111 }
1112
1113 if (disconnectCb)
1114 {
1115 Pub pub;
1116 pub.SetPUuid(recvPUuid);
1117 // Notify the new disconnection.
1118 disconnectCb(pub);
1119 }
1120
1121 // Remove the address entry for this topic.
1122 {
1123 std::lock_guard<std::mutex> lock(this->mutex);
1124 this->info.DelPublishersByProc(recvPUuid);
1125 }
1126
1127 break;
1128 }
1129 case msgs::Discovery::UNADVERTISE:
1130 {
1131 // Read the address.
1132 Pub publisher;
1133 publisher.SetFromDiscovery(msg);
1134
1135 // Check scope of the topic.
1136 if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1137 (publisher.Options().Scope() == Scope_t::HOST &&
1138 !isSenderLocal))
1139 {
1140 return;
1141 }
1142
1143 if (disconnectCb)
1144 {
1145 // Notify the new disconnection.
1146 disconnectCb(publisher);
1147 }
1148
1149 // Remove the address entry for this topic.
1150 {
1151 std::lock_guard<std::mutex> lock(this->mutex);
1152 this->info.DelPublisherByNode(publisher.Topic(),
1153 publisher.PUuid(), publisher.NUuid());
1154 }
1155
1156 break;
1157 }
1158 default:
1159 {
1160 std::cerr << "Unknown message type [" << msg.type() << "].\n";
1161 break;
1162 }
1163 }
1164 }
1165
1172 private: template<typename T>
1173 void SendMsg(const DestinationType &_destType,
1174 const msgs::Discovery::Type _type,
1175 const T &_pub) const
1176 {
1177 gz::msgs::Discovery discoveryMsg;
1178 discoveryMsg.set_version(this->Version());
1179 discoveryMsg.set_type(_type);
1180 discoveryMsg.set_process_uuid(this->pUuid);
1181 _pub.FillDiscovery(discoveryMsg);
1182
1183 switch (_type)
1184 {
1185 case msgs::Discovery::ADVERTISE:
1186 case msgs::Discovery::UNADVERTISE:
1187 case msgs::Discovery::NEW_CONNECTION:
1188 case msgs::Discovery::END_CONNECTION:
1189 {
1190 _pub.FillDiscovery(discoveryMsg);
1191 break;
1192 }
1193 case msgs::Discovery::SUBSCRIBE:
1194 {
1195 discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1196 break;
1197 }
1198 case msgs::Discovery::HEARTBEAT:
1199 case msgs::Discovery::BYE:
1200 case msgs::Discovery::SUBSCRIBERS_REQ:
1201 case msgs::Discovery::SUBSCRIBERS_REP:
1202 break;
1203 default:
1204 std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1205 << " type [" << _type << "]" << std::endl;
1206 return;
1207 }
1208
1209 if (_destType == DestinationType::MULTICAST ||
1210 _destType == DestinationType::ALL)
1211 {
1212 this->SendMulticast(discoveryMsg);
1213 }
1214
1215 // Send the discovery message to the unicast relays.
1216 if (_destType == DestinationType::UNICAST ||
1217 _destType == DestinationType::ALL)
1218 {
1219 // Set the RELAY flag in the header.
1220 discoveryMsg.mutable_flags()->set_relay(true);
1221 this->SendUnicast(discoveryMsg);
1222 }
1223
1224 if (this->verbose)
1225 {
1226 std::cout << "\t* Sending " << msgs::ToString(_type)
1227 << " msg [" << _pub.Topic() << "]" << std::endl;
1228 }
1229 }
1230
1233 private: void SendUnicast(const msgs::Discovery &_msg) const
1234 {
1235 uint16_t msgSize;
1236
1237#if GOOGLE_PROTOBUF_VERSION >= 3004000
1238 size_t msgSizeFull = _msg.ByteSizeLong();
1239#else
1240 int msgSizeFull = _msg.ByteSize();
1241#endif
1242 if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1243 {
1244 std::cerr << "Discovery message too large to send. Discovery won't "
1245 << "work. This shouldn't happen.\n";
1246 return;
1247 }
1248 msgSize = msgSizeFull;
1249
1250 uint16_t totalSize = sizeof(msgSize) + msgSize;
1251 char *buffer = static_cast<char *>(new char[totalSize]);
1252 memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1253
1254 if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1255 {
1256 // Send the discovery message to the unicast relays.
1257 for (const auto &sockAddr : this->relayAddrs)
1258 {
1259 errno = 0;
1260 auto sent = sendto(this->sockets.at(0),
1261 reinterpret_cast<const raw_type *>(
1262 reinterpret_cast<const unsigned char*>(buffer)),
1263 totalSize, 0,
1264 reinterpret_cast<const sockaddr *>(&sockAddr),
1265 sizeof(sockAddr));
1266
1267 if (sent != totalSize)
1268 {
1269 std::cerr << "Exception sending a unicast message:" << std::endl;
1270 std::cerr << " Return value: " << sent << std::endl;
1271 std::cerr << " Error code: " << strerror(errno) << std::endl;
1272 break;
1273 }
1274 }
1275 }
1276 else
1277 {
1278 std::cerr << "Discovery::SendUnicast: Error serializing data."
1279 << std::endl;
1280 }
1281
1282 delete [] buffer;
1283 }
1284
1287 private: void SendMulticast(const msgs::Discovery &_msg) const
1288 {
1289 uint16_t msgSize;
1290
1291#if GOOGLE_PROTOBUF_VERSION >= 3004000
1292 size_t msgSizeFull = _msg.ByteSizeLong();
1293#else
1294 int msgSizeFull = _msg.ByteSize();
1295#endif
1296 if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1297 {
1298 std::cerr << "Discovery message too large to send. Discovery won't "
1299 << "work. This shouldn't happen.\n";
1300 return;
1301 }
1302
1303 msgSize = msgSizeFull;
1304 uint16_t totalSize = sizeof(msgSize) + msgSize;
1305 char *buffer = static_cast<char *>(new char[totalSize]);
1306 memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1307
1308 if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1309 {
1310 // Send the discovery message to the multicast group through all the
1311 // sockets.
1312 for (const auto &sock : this->Sockets())
1313 {
1314 errno = 0;
1315 if (sendto(sock, reinterpret_cast<const raw_type *>(
1316 reinterpret_cast<const unsigned char*>(buffer)),
1317 totalSize, 0,
1318 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1319 sizeof(*(this->MulticastAddr()))) != totalSize)
1320 {
1321 // Ignore EPERM and ENOBUFS errors.
1322 //
1323 // See issue #106
1324 //
1325 // Rationale drawn from:
1326 //
1327 // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1328 // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1329 if (errno != EPERM && errno != ENOBUFS)
1330 {
1331 std::cerr << "Exception sending a multicast message:"
1332 << strerror(errno) << std::endl;
1333 }
1334 break;
1335 }
1336 }
1337 }
1338 else
1339 {
1340 std::cerr << "Discovery::SendMulticast: Error serializing data."
1341 << std::endl;
1342 }
1343
1344 delete [] buffer;
1345 }
1346
1349 private: const std::vector<int> &Sockets() const
1350 {
1351 return this->sockets;
1352 }
1353
1356 private: const sockaddr_in *MulticastAddr() const
1357 {
1358 return &this->mcastAddr;
1359 }
1360
1363 private: uint8_t Version() const
1364 {
1365 static std::string gzStats;
1366 static int topicStats;
1367
1368 if (env("GZ_TRANSPORT_TOPIC_STATISTICS", gzStats) && !gzStats.empty())
1369 {
1370 topicStats = (gzStats == "1");
1371 }
1372
1373 return this->kWireVersion + (topicStats * 100);
1374 }
1375
1380 private: bool RegisterNetIface(const std::string &_ip)
1381 {
1382 // Make a new socket for sending discovery information.
1383 int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1384 if (sock < 0)
1385 {
1386 std::cerr << "Socket creation failed." << std::endl;
1387 return false;
1388 }
1389
1390 // Socket option: IP_MULTICAST_IF.
1391 // This socket option needs to be applied to each socket used to send
1392 // data. This option selects the source interface for outgoing messages.
1393 struct in_addr ifAddr;
1394 ifAddr.s_addr = inet_addr(_ip.c_str());
1395 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1396 reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1397 {
1398 std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1399 << std::endl;
1400 return false;
1401 }
1402
1403 this->sockets.push_back(sock);
1404
1405 // Join the multicast group. We have to do it for each network interface
1406 // but we can do it on the same socket. We will use the socket at
1407 // position 0 for receiving multicast information.
1408 struct ip_mreq group;
1409 group.imr_multiaddr.s_addr =
1410 inet_addr(this->multicastGroup.c_str());
1411 group.imr_interface.s_addr = inet_addr(_ip.c_str());
1412 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1413 reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1414 {
1415 std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1416 << std::endl;
1417 return false;
1418 }
1419
1420 return true;
1421 }
1422
1425 private: void AddRelayAddress(const std::string &_ip)
1426 {
1427 // Sanity check: Make sure that this IP address is not already saved.
1428 for (auto const &addr : this->relayAddrs)
1429 {
1430 if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1431 return;
1432 }
1433
1434 sockaddr_in addr;
1435 memset(&addr, 0, sizeof(addr));
1436 addr.sin_family = AF_INET;
1437 addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1438 addr.sin_port = htons(static_cast<u_short>(this->port));
1439
1440 this->relayAddrs.push_back(addr);
1441 }
1442
1446 private: static const unsigned int kDefActivityInterval = 100;
1447
1451 private: static const unsigned int kDefHeartbeatInterval = 1000;
1452
1456 private: static const unsigned int kDefSilenceInterval = 3000;
1457
1459 private: std::string multicastGroup;
1460
1462 private: const int kTimeout = 250;
1463
1465 private: static const uint16_t kMaxRcvStr =
1467
1470 private: static const uint8_t kWireVersion = 10;
1471
1473 private: int port;
1474
1476 private: std::string hostAddr;
1477
1479 private: std::vector<std::string> hostInterfaces;
1480
1482 private: std::string pUuid;
1483
1487 private: unsigned int silenceInterval;
1488
1492 private: unsigned int activityInterval;
1493
1497 private: unsigned int heartbeatInterval;
1498
1500 private: DiscoveryCallback<Pub> connectionCb;
1501
1503 private: DiscoveryCallback<Pub> disconnectionCb;
1504
1506 private: DiscoveryCallback<Pub> registrationCb;
1507
1509 private: DiscoveryCallback<Pub> unregistrationCb;
1510
1512 private: std::function<void()> subscribersCb;
1513
1515 private: TopicStorage<Pub> info;
1516
1518 private: TopicStorage<Pub> remoteSubscribers;
1519
1525
1527 private: bool verbose;
1528
1530 private: std::vector<int> sockets;
1531
1533 private: sockaddr_in mcastAddr;
1534
1536 private: std::vector<sockaddr_in> relayAddrs;
1537
1539 private: mutable std::mutex mutex;
1540
1542 private: std::thread threadReception;
1543
1545 private: Timestamp timeNextHeartbeat;
1546
1548 private: Timestamp timeNextActivity;
1549
1551 private: std::mutex exitMutex;
1552
1557 private: bool initialized;
1558
1560 private: unsigned int numHeartbeatsUninitialized;
1561
1563 private: mutable std::condition_variable initializedCv;
1564
1566 private: bool exit;
1567
1569 private: bool enabled;
1570 };
1571
1575
1579 }
1580 }
1581}
1582
1583#endif