Node.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2012 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18#ifndef GAZEBO_TRANSPORT_NODE_HH_
19#define GAZEBO_TRANSPORT_NODE_HH_
20
21#ifndef Q_MOC_RUN
22#include <tbb/task.h>
23#endif
24#include <boost/bind.hpp>
25#include <boost/enable_shared_from_this.hpp>
26#include <map>
27#include <list>
28#include <string>
29#include <vector>
30
33#include "gazebo/util/system.hh"
34
35namespace gazebo
36{
37 namespace transport
38 {
41 class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
42 {
46 public: PublishTask(transport::PublisherPtr _pub,
47 const google::protobuf::Message &_message)
48 : pub(_pub)
49 {
50 this->msg = _message.New();
51 this->msg->CopyFrom(_message);
52 }
53
56 public: tbb::task *execute()
57 {
58 this->pub->WaitForConnection();
59 this->pub->Publish(*this->msg, true);
60 this->pub->SendMessage();
61 delete this->msg;
62 this->pub.reset();
63 return NULL;
64 }
65
67 private: transport::PublisherPtr pub;
68
70 private: google::protobuf::Message *msg;
71 };
73
76
80 class GZ_TRANSPORT_VISIBLE Node :
81 public boost::enable_shared_from_this<Node>
82 {
84 public: Node();
85
87 public: virtual ~Node();
88
97 public: void Init(const std::string &_space ="");
98
110 public: bool TryInit(
111 const common::Time &_maxWait = common::Time(1, 0));
112
117 public: bool IsInitialized() const;
118
120 public: void Fini();
121
124 public: std::string GetTopicNamespace() const;
125
129 public: std::string DecodeTopicName(const std::string &_topic);
130
134 public: std::string EncodeTopicName(const std::string &_topic);
135
138 public: unsigned int GetId() const;
139
142 public: void ProcessPublishers();
143
145 public: void ProcessIncoming();
146
150 public: bool HasLatchedSubscriber(const std::string &_topic) const;
151
152
159 public: template<typename M>
160 void Publish(const std::string &_topic,
161 const google::protobuf::Message &_message)
162 {
163 transport::PublisherPtr pub = this->Advertise<M>(_topic);
164 PublishTask *task = new(tbb::task::allocate_root())
165 PublishTask(pub, _message);
166
167 tbb::task::enqueue(*task);
168 return;
169 }
170
178 public: template<typename M>
179 transport::PublisherPtr Advertise(const std::string &_topic,
180 unsigned int _queueLimit = 1000,
181 double _hzRate = 0)
182 {
183 std::string decodedTopic = this->DecodeTopicName(_topic);
184 PublisherPtr publisher =
185 transport::TopicManager::Instance()->Advertise<M>(
186 decodedTopic, _queueLimit, _hzRate);
187
188 boost::mutex::scoped_lock lock(this->publisherMutex);
189 publisher->SetNode(shared_from_this());
190 this->publishers.push_back(publisher);
191
192 return publisher;
193 }
194
201 public: void Publish(const std::string &_topic,
202 const google::protobuf::Message &_message)
203 {
204 transport::PublisherPtr pub = this->Advertise(_topic,
205 _message.GetTypeName());
206 pub->WaitForConnection();
207
208 pub->Publish(_message, true);
209 }
210
218 public: transport::PublisherPtr Advertise(const std::string &_topic,
219 const std::string &_msgTypeName,
220 unsigned int _queueLimit = 1000,
221 double _hzRate = 0)
222 {
223 std::string decodedTopic = this->DecodeTopicName(_topic);
224 PublisherPtr publisher =
225 transport::TopicManager::Instance()->Advertise(
226 decodedTopic, _msgTypeName, _queueLimit, _hzRate);
227
228 boost::mutex::scoped_lock lock(this->publisherMutex);
229 publisher->SetNode(shared_from_this());
230 this->publishers.push_back(publisher);
231
232 return publisher;
233 }
234
242 public: template<typename M, typename T>
243 SubscriberPtr Subscribe(const std::string &_topic,
244 void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
245 bool _latching = false)
246 {
248 std::string decodedTopic = this->DecodeTopicName(_topic);
249 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
250
251 {
252 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
253 this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
254 new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
255 }
256
257 SubscriberPtr result =
258 transport::TopicManager::Instance()->Subscribe(ops);
259
260 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
261
262 return result;
263 }
264
271 public: template<typename M>
272 SubscriberPtr Subscribe(const std::string &_topic,
273 void(*_fp)(const boost::shared_ptr<M const> &),
274 bool _latching = false)
275 {
277 std::string decodedTopic = this->DecodeTopicName(_topic);
278 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
279
280 {
281 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
282 this->callbacks[decodedTopic].push_back(
283 CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
284 }
285
286 SubscriberPtr result =
287 transport::TopicManager::Instance()->Subscribe(ops);
288
289 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
290
291 return result;
292 }
293
301 template<typename T>
302 SubscriberPtr Subscribe(const std::string &_topic,
303 void(T::*_fp)(const std::string &), T *_obj,
304 bool _latching = false)
305 {
307 std::string decodedTopic = this->DecodeTopicName(_topic);
308 ops.Init(decodedTopic, shared_from_this(), _latching);
309
310 {
311 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
312 this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
313 new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
314 }
315
316 SubscriberPtr result =
317 transport::TopicManager::Instance()->Subscribe(ops);
318
319 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
320
321 return result;
322 }
323
324
331 SubscriberPtr Subscribe(const std::string &_topic,
332 void(*_fp)(const std::string &), bool _latching = false)
333 {
335 std::string decodedTopic = this->DecodeTopicName(_topic);
336 ops.Init(decodedTopic, shared_from_this(), _latching);
337
338 {
339 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
340 this->callbacks[decodedTopic].push_back(
342 }
343
344 SubscriberPtr result =
345 transport::TopicManager::Instance()->Subscribe(ops);
346
347 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
348
349 return result;
350 }
351
356 public: bool HandleData(const std::string &_topic,
357 const std::string &_msg);
358
363 public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
364
371 public: void InsertLatchedMsg(const std::string &_topic,
372 const std::string &_msg);
373
380 public: void InsertLatchedMsg(const std::string &_topic,
381 MessagePtr _msg);
382
386 public: std::string GetMsgType(const std::string &_topic) const;
387
393 public: void RemoveCallback(const std::string &_topic, unsigned int _id);
394
406 private: bool PrivateInit(const std::string &_space,
407 const common::Time &_maxWait,
408 const bool _fallbackToDefault);
409
410 private: std::string topicNamespace;
411 private: std::vector<PublisherPtr> publishers;
412 private: std::vector<PublisherPtr>::iterator publishersIter;
413 private: static unsigned int idCounter;
414 private: unsigned int id;
415
416 private: typedef std::list<CallbackHelperPtr> Callback_L;
417 private: typedef std::map<std::string, Callback_L> Callback_M;
418 private: Callback_M callbacks;
419 private: std::map<std::string, std::list<std::string> > incomingMsgs;
420
422 private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
423
424 private: boost::mutex publisherMutex;
425 private: boost::mutex publisherDeleteMutex;
426 private: boost::recursive_mutex incomingMutex;
427
430 private: boost::recursive_mutex processIncomingMutex;
431
432 private: bool initialized;
433 };
435 }
436}
437#endif
#define NULL
Definition CommonTypes.hh:31
transport
Definition ConnectionManager.hh:35
Forward declarations for transport.
static TopicManager * Instance()
Get an instance of the singleton.
Definition SingletonT.hh:36
A Time class, can be used to hold wall- or sim-time.
Definition Time.hh:48
Callback helper Template.
Definition CallbackHelper.hh:112
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition Node.hh:82
void Fini()
Finalize the node.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition Node.hh:272
std::string GetTopicNamespace() const
Get the topic namespace for this node.
void InsertLatchedMsg(const std::string &_topic, MessagePtr _msg)
Add a latched message to the node for publication.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition Node.hh:302
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition Node.hh:218
bool IsInitialized() const
Check if this Node has been initialized.
unsigned int GetId() const
Get the unique ID of the node.
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition Node.hh:179
void ProcessIncoming()
Process incoming messages.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition Node.hh:201
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
void Init(const std::string &_space="")
Init the node.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition Node.hh:243
virtual ~Node()
Destructor.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
void RemoveCallback(const std::string &_topic, unsigned int _id)
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition Node.hh:160
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition Node.hh:331
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition CallbackHelper.hh:178
Options for a subscription.
Definition SubscribeOptions.hh:36
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition SubscribeOptions.hh:48
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition CallbackHelper.hh:105
Definition JointMaker.hh:45
boost::shared_ptr< Subscriber > SubscriberPtr
Definition TransportTypes.hh:53
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition TransportTypes.hh:45
boost::shared_ptr< Publisher > PublisherPtr
Definition TransportTypes.hh:49
Forward declarations for the common classes.
Definition Animation.hh:27
STL namespace.