Connection.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2012 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17#ifndef _CONNECTION_HH_
18#define _CONNECTION_HH_
19
20#ifndef Q_MOC_RUN
21#include <tbb/task.h>
22#endif
23#include <google/protobuf/message.h>
24
25#include <boost/asio.hpp>
26#include <boost/bind.hpp>
27#include <boost/function.hpp>
28#include <boost/thread.hpp>
29#include <boost/tuple/tuple.hpp>
30
31#include <string>
32#include <vector>
33#include <iostream>
34#include <iomanip>
35#include <deque>
36#include <utility>
37
42#include "gazebo/util/system.hh"
43
44#define HEADER_LENGTH 8
45
46namespace gazebo
47{
48 namespace transport
49 {
50 extern GZ_TRANSPORT_VISIBLE bool is_stopped();
51
52 class IOManager;
53 class Connection;
54 typedef boost::shared_ptr<Connection> ConnectionPtr;
55
59 class GZ_TRANSPORT_VISIBLE ConnectionReadTask : public tbb::task
60 {
65 public: ConnectionReadTask(
66 boost::function<void (const std::string &)> _func,
67 const std::string &_data) :
68 func(_func),
69 data(_data)
70 {
71 }
72
75 public: tbb::task *execute()
76 {
77 this->func(this->data);
78 return NULL;
79 }
80
82 private: boost::function<void (const std::string &)> func;
83
85 private: std::string data;
86 };
88
103 class GZ_TRANSPORT_VISIBLE Connection :
104 public boost::enable_shared_from_this<Connection>
105 {
107 public: Connection();
108
110 public: virtual ~Connection();
111
116 public: bool Connect(const std::string &_host, unsigned int _port);
117
119 typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
120
125 public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
126
128 typedef boost::function<void(const std::string &_data)> ReadCallback;
129
133 public: void StartRead(const ReadCallback &_cb);
134
136 public: void StopRead();
137
139 public: void Shutdown();
140
143 public: bool IsOpen() const;
144
146 private: void Close();
147
149 public: void Cancel();
150
154 public: bool Read(std::string &_data);
155
163 public: void EnqueueMsg(const std::string &_buffer,
164 boost::function<void(uint32_t)> _cb, uint32_t _id,
165 bool _force = false);
166
171 public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
172
175 public: std::string GetLocalURI() const;
176
179 public: std::string GetRemoteURI() const;
180
183 public: std::string GetLocalAddress() const;
184
187 public: unsigned int GetLocalPort() const;
188
191 public: std::string GetRemoteAddress() const;
192
195 public: unsigned int GetRemotePort() const;
196
199 public: std::string GetRemoteHostname() const;
200
203 public: static std::string GetLocalHostname();
204
207 public: template<typename Handler>
208 void AsyncRead(Handler _handler)
209 {
210 boost::mutex::scoped_lock lock(this->socketMutex);
211 if (!this->IsOpen())
212 {
213 gzerr << "AsyncRead on a closed socket\n";
214 return;
215 }
216
217 void (Connection::*f)(const boost::system::error_code &,
218 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
219
220 this->inboundHeader.resize(HEADER_LENGTH);
221 boost::asio::async_read(*this->socket,
222 boost::asio::buffer(this->inboundHeader),
223 common::weakBind(f, this->shared_from_this(),
224 boost::asio::placeholders::error,
225 boost::make_tuple(_handler)));
226 }
227
235 private: template<typename Handler>
236 void OnReadHeader(const boost::system::error_code &_e,
237 boost::tuple<Handler> _handler)
238 {
239 if (_e)
240 {
241 if (_e.value() == boost::asio::error::eof)
242 this->isOpen = false;
243 }
244 else
245 {
246 std::size_t inboundData_size = 0;
247 std::string header(&this->inboundHeader[0],
248 this->inboundHeader.size());
249 this->inboundHeader.clear();
250
251 inboundData_size = this->ParseHeader(header);
252
253 if (inboundData_size > 0)
254 {
255 // Start the asynchronous call to receive data
256 this->inboundData.resize(inboundData_size);
257
258 void (Connection::*f)(const boost::system::error_code &e,
259 boost::tuple<Handler>) =
260 &Connection::OnReadData<Handler>;
261
262 boost::asio::async_read(*this->socket,
263 boost::asio::buffer(this->inboundData),
264 common::weakBind(f, this->shared_from_this(),
265 boost::asio::placeholders::error,
266 _handler));
267 }
268 else
269 {
270 gzerr << "Header is empty\n";
271 boost::get<0>(_handler)("");
272 // This code tries to read the header again. We should
273 // never get here.
274 // this->inboundHeader.resize(HEADER_LENGTH);
275
276 // void (Connection::*f)(const boost::system::error_code &,
277 // boost::tuple<Handler>) =
278 // &Connection::OnReadHeader<Handler>;
279
280 // boost::asio::async_read(*this->socket,
281 // boost::asio::buffer(this->inboundHeader),
282 // common::weakBind(f, this->shared_from_this(),
283 // boost::asio::placeholders::error, _handler));
284 }
285 }
286 }
287
295 private: template<typename Handler>
296 void OnReadData(const boost::system::error_code &_e,
297 boost::tuple<Handler> _handler)
298 {
299 if (_e)
300 {
301 if (_e.value() == boost::asio::error::eof)
302 this->isOpen = false;
303 }
304
305 // Inform caller that data has been received
306 std::string data(&this->inboundData[0],
307 this->inboundData.size());
308 this->inboundData.clear();
309
310 if (data.empty())
311 gzerr << "OnReadData got empty data!!!\n";
312
313 if (!_e && !transport::is_stopped())
314 {
315 ConnectionReadTask *task = new(tbb::task::allocate_root())
316 ConnectionReadTask(boost::get<0>(_handler), data);
317 tbb::task::enqueue(*task);
318
319 // Non-tbb version:
320 // boost::get<0>(_handler)(data);
321 }
322 }
323
327 public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
328 _subscriber)
329 { return this->shutdown.Connect(_subscriber); }
330
332 public: void ProcessWriteQueue(bool _blocking = false);
333
336 public: unsigned int GetId() const;
337
341 public: static bool ValidateIP(const std::string &_ip);
342
346 public: std::string GetIPWhiteList() const;
347
350 private: void PostWrite();
351
355 private: void OnWrite(const boost::system::error_code &_e);
356
359 private: void OnAccept(const boost::system::error_code &_e);
360
363 private: std::size_t ParseHeader(const std::string &_header);
364
366 private: void ReadLoop(const ReadCallback &_cb);
367
370 private: static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
371
374 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
375
378 private: static std::string GetHostname(
379 boost::asio::ip::tcp::endpoint _ep);
380
384 private: void OnConnect(const boost::system::error_code &_error,
385 boost::asio::ip::tcp::resolver::iterator _endPointIter);
386
388 private: boost::asio::ip::tcp::socket *socket;
389
391 private: boost::asio::ip::tcp::acceptor *acceptor;
392
394 private: std::deque<std::string> writeQueue;
395
398 private: std::deque< std::vector<
399 std::pair<boost::function<void(uint32_t)>, uint32_t> > >
400 callbacks;
401
403 private: boost::mutex connectMutex;
404
406 private: boost::recursive_mutex writeMutex;
407
409 private: boost::recursive_mutex readMutex;
410
412 private: mutable boost::mutex socketMutex;
413
415 private: boost::condition_variable connectCondition;
416
418 private: AcceptCallback acceptCB;
419
421 private: std::vector<char> inboundHeader;
422
424 private: std::vector<char> inboundData;
425
427 private: bool readQuit;
428
430 private: unsigned int id;
431
433 private: static unsigned int idCounter;
434
436 private: ConnectionPtr acceptConn;
437
439 private: event::EventT<void()> shutdown;
440
442 private: static IOManager *iomanager;
443
445 private: unsigned int writeCount;
446
448 private: std::string localURI;
449
451 private: std::string localAddress;
452
454 private: std::string remoteURI;
455
457 private: std::string remoteAddress;
458
460 private: bool connectError;
461
463 private: std::string ipWhiteList;
464
466 private: bool dropMsgLogged;
467
469 private: bool isOpen;
470 };
472 }
473}
474#endif
#define NULL
Definition CommonTypes.hh:31
transport
Definition ConnectionManager.hh:35
#define HEADER_LENGTH
Definition Connection.hh:44
A class for event processing.
Definition Event.hh:98
Single TCP/IP connection manager.
Definition Connection.hh:105
bool IsOpen() const
Is the connection open?
void ProcessWriteQueue(bool _blocking=false)
Handle on-write callbacks.
boost::function< void(const std::string &_data) ReadCallback)
The signature of a connection read callback.
Definition Connection.hh:128
bool Connect(const std::string &_host, unsigned int _port)
Connect to a remote host.
void EnqueueMsg(const std::string &_buffer, boost::function< void(uint32_t)> _cb, uint32_t _id, bool _force=false)
Write data to the socket.
std::string GetLocalURI() const
Get the local URI.
unsigned int GetId() const
Get the ID of the connection.
void Listen(unsigned int _port, const AcceptCallback &_acceptCB)
Start a server that listens on a port.
void StartRead(const ReadCallback &_cb)
Start a thread that reads from the connection and passes new message to the ReadCallback.
boost::function< void(const ConnectionPtr &) AcceptCallback)
The signature of a connection accept callback.
Definition Connection.hh:119
std::string GetRemoteURI() const
Get the remote URI.
std::string GetRemoteHostname() const
Get the remote hostname.
std::string GetIPWhiteList() const
Get the IP white list, from GAZEBO_IP_WHITE_LIST environment variable.
static std::string GetLocalHostname()
Get the local hostname.
unsigned int GetLocalPort() const
Get the port of this connection.
void Cancel()
Cancel all async operations on an open socket.
std::string GetRemoteAddress() const
Get the remote address.
void EnqueueMsg(const std::string &_buffer, bool _force=false)
Write data to the socket.
unsigned int GetRemotePort() const
Get the remote port number.
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down.
Definition Connection.hh:327
void Shutdown()
Shutdown the socket.
void StopRead()
Stop the read loop.
bool Read(std::string &_data)
Read data from the socket.
std::string GetLocalAddress() const
Get the local address of this connection.
virtual ~Connection()
Destructor.
static bool ValidateIP(const std::string &_ip)
Return true if the _ip is a valid.
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data.
Definition Connection.hh:208
Manages boost::asio IO.
Definition IOManager.hh:34
auto weakBind(Func _func, boost::shared_ptr< T > _ptr, Args... _args) -> decltype(details::makeWeakBinder(boost::bind(_func, _ptr.get(), _args...), boost::weak_ptr< T >(_ptr)))
Definition WeakBind.hh:110
#define gzerr
Output an error message.
Definition Console.hh:50
ConnectionPtr Connect(const std::function< T > &_subscriber)
Connect a callback to this event.
Definition Event.hh:558
bool is_stopped()
Is the transport system stopped?
Definition JointMaker.hh:45
boost::shared_ptr< Connection > ConnectionPtr
Definition CommonTypes.hh:134
boost::shared_ptr< Connection > ConnectionPtr
Definition Connection.hh:54
Forward declarations for the common classes.
Definition Animation.hh:27
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation.