TopicManager.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2012 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17#ifndef GAZEBO_TRANSPORT_TOPICMANAGER_HH_
18#define GAZEBO_TRANSPORT_TOPICMANAGER_HH_
19
20#include <boost/bind.hpp>
21#include <boost/function.hpp>
22#include <map>
23#include <list>
24#include <string>
25#include <vector>
26#include <boost/unordered/unordered_set.hpp>
27
30#include "gazebo/msgs/msgs.hh"
32
41#include "gazebo/util/system.hh"
42
44GZ_SINGLETON_DECLARE(GZ_TRANSPORT_VISIBLE, gazebo, transport, TopicManager)
45
46namespace gazebo
47{
48 namespace transport
49 {
52
55 class GZ_TRANSPORT_VISIBLE TopicManager : public SingletonT<TopicManager>
56 {
57 private: TopicManager();
58 private: virtual ~TopicManager();
59
61 public: void Init();
62
64 public: void Fini();
65
69 public: PublicationPtr FindPublication(const std::string &_topic);
70
73 public: void AddNode(NodePtr _node);
74
77 public: void RemoveNode(unsigned int _id);
78
82 public: void ProcessNodes(bool _onlyOut = false);
83
87 public: SubscriberPtr Subscribe(const SubscribeOptions &_options);
88
93 public: void Unsubscribe(const std::string &_topic, const NodePtr &_sub);
94
102 public: PublisherPtr Advertise(const std::string &_topic,
103 const std::string &_msgTypeName,
104 unsigned int _queueLimit,
105 double _hzRate)
106 {
107 this->UpdatePublications(_topic, _msgTypeName);
108
109 PublisherPtr pub = PublisherPtr(new Publisher(_topic,
110 _msgTypeName, _queueLimit, _hzRate));
111
112 // Connect all local subscription to the publisher
113 PublicationPtr publication = this->FindPublication(_topic);
114 GZ_ASSERT(publication != nullptr,
115 "FindPublication returned nullptr");
116
117 publication->AddPublisher(pub);
118 if (!publication->GetLocallyAdvertised())
119 {
120 ConnectionManager::Instance()->Advertise(_topic,
121 _msgTypeName);
122 }
123
124 publication->SetLocallyAdvertised(true);
125 pub->SetPublication(publication);
126
127 for (auto &iter2 : this->subscribedNodes)
128 {
129 if (iter2.first == _topic)
130 {
131 for (const auto liter : iter2.second)
132 {
133 publication->AddSubscription(liter);
134 }
135 }
136 }
137
138 return pub;
139 }
140
148 public: template<typename M>
149 PublisherPtr Advertise(const std::string &_topic,
150 unsigned int _queueLimit,
151 double _hzRate)
152 {
153 google::protobuf::Message *msg = nullptr;
154 M msgtype;
155 msg = dynamic_cast<google::protobuf::Message *>(&msgtype);
156 if (!msg)
157 gzthrow("Advertise requires a google protobuf type");
158
159 return this->Advertise(_topic, msg->GetTypeName(), _queueLimit,
160 _hzRate);
161 }
162
165 public: void Unadvertise(const std::string &_topic);
166
169 public: void Unadvertise(PublisherPtr _pub);
170
175 public: void Unadvertise(const std::string &_topic, const uint32_t _id);
176
183 public: void Publish(const std::string &_topic, MessagePtr _message,
184 boost::function<void(uint32_t)> _cb, uint32_t _id);
185
189 public: void ConnectPubToSub(const std::string &_topic,
190 const SubscriptionTransportPtr _sublink);
191
194 public: void ConnectSubToPub(const msgs::Publish &_pub);
195
200 public: void DisconnectPubFromSub(const std::string &_topic,
201 const std::string &_host,
202 unsigned int _port);
203
208 public: void DisconnectSubFromPub(const std::string &_topic,
209 const std::string &_host,
210 unsigned int _port);
211
214 public: void ConnectSubscribers(const std::string &_topic);
215
221 public: PublicationPtr UpdatePublications(const std::string &_topic,
222 const std::string &_msgType);
223
226 public: void RegisterTopicNamespace(const std::string &_name);
227
230 public: void GetTopicNamespaces(std::list<std::string> &_namespaces);
231
233 public: void ClearBuffers();
234
237 public: void PauseIncoming(bool _pause);
238
241 public: void AddNodeToProcess(NodePtr _ptr);
242
244 typedef std::map<std::string, std::list<NodePtr> > SubNodeMap;
245
246 private: typedef std::map<std::string, PublicationPtr> PublicationPtr_M;
247 private: PublicationPtr_M advertisedTopics;
248 private: PublicationPtr_M::iterator advertisedTopicsEnd;
249 private: SubNodeMap subscribedNodes;
250 private: std::vector<NodePtr> nodes;
251
253 private: boost::unordered_set<NodePtr> nodesToProcess;
254
255 private: boost::recursive_mutex nodeMutex;
256
258 private: boost::mutex subscriberMutex;
259
261 private: boost::mutex processNodesMutex;
262
263 private: bool pauseIncoming;
264
265 // Singleton implementation
266 private: friend class SingletonT<TopicManager>;
267 };
269 }
270}
271#endif
#define GZ_ASSERT(_expr, _msg)
This macro define the standard way of launching an exception inside gazebo.
Definition Assert.hh:24
transport
Definition ConnectionManager.hh:35
gazebo
Definition TopicManager.hh:44
transport
Definition TopicManager.hh:44
Forward declarations for transport.
Singleton template class.
Definition SingletonT.hh:34
A publisher of messages on a topic.
Definition Publisher.hh:46
Options for a subscription.
Definition SubscribeOptions.hh:36
Manages topics and their subscriptions.
Definition TopicManager.hh:56
void Fini()
Finalize the manager.
void ClearBuffers()
Clear all buffers.
void Unadvertise(PublisherPtr _pub)
Unadvertise a publisher.
void PauseIncoming(bool _pause)
Pause or unpause processing of incoming messages.
std::map< std::string, std::list< NodePtr > > SubNodeMap
A map of string->list of Node pointers.
Definition TopicManager.hh:244
void Init()
Initialize the manager.
SubscriberPtr Subscribe(const SubscribeOptions &_options)
Subscribe to a topic.
void RemoveNode(unsigned int _id)
Remove a node by its id.
void Publish(const std::string &_topic, MessagePtr _message, boost::function< void(uint32_t)> _cb, uint32_t _id)
Send a message.
void Unadvertise(const std::string &_topic, const uint32_t _id)
Unadvertise a publisher, based on a publisher id.
void AddNode(NodePtr _node)
Add a node to the manager.
void RegisterTopicNamespace(const std::string &_name)
Register a new topic namespace.
PublicationPtr UpdatePublications(const std::string &_topic, const std::string &_msgType)
Update our list of advertised topics.
void GetTopicNamespaces(std::list< std::string > &_namespaces)
Get all the topic namespaces.
void ProcessNodes(bool _onlyOut=false)
Process all nodes under management.
void ConnectSubToPub(const msgs::Publish &_pub)
Connect a local Subscriber to a remote Publisher.
void AddNodeToProcess(NodePtr _ptr)
Add a node to the list of nodes that requires processing.
void DisconnectSubFromPub(const std::string &_topic, const std::string &_host, unsigned int _port)
Disconnect all local subscribers from a remote publisher.
void ConnectSubscribers(const std::string &_topic)
Connect all subscribers on a topic to known publishers.
void Unadvertise(const std::string &_topic)
Unadvertise a topic.
PublicationPtr FindPublication(const std::string &_topic)
Find a publication object by topic.
void ConnectPubToSub(const std::string &_topic, const SubscriptionTransportPtr _sublink)
Connection a local Publisher to a remote Subscriber.
void Unsubscribe(const std::string &_topic, const NodePtr &_sub)
Unsubscribe from a topic.
void DisconnectPubFromSub(const std::string &_topic, const std::string &_host, unsigned int _port)
Disconnect a local publisher from a remote subscriber.
PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit, double _hzRate)
Advertise on a topic.
Definition TopicManager.hh:149
PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit, double _hzRate)
Advertise on a topic.
Definition TopicManager.hh:102
#define GZ_SINGLETON_DECLARE(visibility, n1, n2, singletonType)
Helper to declare typed SingletonT.
Definition SingletonT.hh:58
#define gzthrow(msg)
This macro logs an error to the throw stream and throws an exception that contains the file name and ...
Definition Exception.hh:39
boost::shared_ptr< Publication > PublicationPtr
Definition TransportTypes.hh:61
boost::shared_ptr< Subscriber > SubscriberPtr
Definition TransportTypes.hh:53
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition TransportTypes.hh:45
boost::shared_ptr< SubscriptionTransport > SubscriptionTransportPtr
Definition TransportTypes.hh:69
boost::shared_ptr< Publisher > PublisherPtr
Definition TransportTypes.hh:49
boost::shared_ptr< Node > NodePtr
Definition TransportTypes.hh:57
Forward declarations for the common classes.
Definition Animation.hh:27