25#include <boost/bind/placeholders.hpp>
27#include <boost/bind/protect.hpp>
28#include <boost/shared_array.hpp>
29#include <boost/atomic.hpp>
32#ifndef ROBOTRACONTEUR_EMSCRIPTEN
33#include <boost/asio.hpp>
34#include <boost/asio/steady_timer.hpp>
39namespace RobotRaconteur
46class ASIOStreamBaseTransport :
public ITransportConnection,
public RR_ENABLE_SHARED_FROM_THIS<ASIOStreamBaseTransport>
50 struct message_queue_entry
52 RR_INTRUSIVE_PTR<Message> message;
53 boost::function<void(
const RR_SHARED_PTR<RobotRaconteurException>&)> callback;
56 RR_BOOST_ASIO_IO_CONTEXT& _io_context;
58 boost::atomic<bool> connected;
60 boost::shared_array<uint8_t> sendbuf;
63 boost::mutex send_lock;
64 boost::mutex sync_send_lock;
66 bool send_pause_request;
68 boost::function<void(
const boost::system::error_code&)> send_pause_request_handler;
70 std::list<message_queue_entry> send_queue;
71 size_t send_message_size;
72 boost::condition_variable send_event;
74 boost::atomic<boost::posix_time::ptime> tlastsend;
75 boost::atomic<boost::posix_time::ptime> tlastrecv;
77 boost::array<uint8_t, 8> streammagic;
78 uint32_t recv_message_size;
79 boost::shared_array<uint8_t> recvbuf;
83 bool recv_pause_request;
86 boost::function<void(
const boost::system::error_code&)> recv_pause_request_handler;
88 boost::mutex recv_lock;
90 RR_SHARED_PTR<boost::asio::deadline_timer> heartbeat_timer;
92 uint32_t ReceiveTimeout;
93 uint32_t HeartbeatPeriod;
97 bool CheckStreamCapability_closed;
98 bool CheckStreamCapability_waiting;
99 boost::mutex CheckStreamCapability_lock;
100 boost::function<void(uint32_t,
const RR_SHARED_PTR<RobotRaconteurException>&)> CheckStreamCapability_callback;
102 boost::tuple<std::string, boost::function<void(uint32_t,
const RR_SHARED_PTR<RobotRaconteurException>&)> > >
103 CheckStreamCapability_queue;
105 RR_SHARED_PTR<boost::asio::deadline_timer> CheckStreamCapability_timer;
107 bool streamop_closed;
108 bool streamop_waiting;
109 boost::mutex streamop_lock;
110 boost::function<void(
const RR_SHARED_PTR<RRObject>&,
const RR_SHARED_PTR<RobotRaconteurException>&)>
112 std::queue<boost::tuple<
113 std::string,
const RR_SHARED_PTR<RRObject>&,
114 boost::function<void(
const RR_SHARED_PTR<RRObject>&,
const RR_SHARED_PTR<RobotRaconteurException>&)> > >
117 RR_SHARED_PTR<boost::asio::deadline_timer> streamop_timer;
119 boost::shared_mutex RemoteNodeID_lock;
121 NodeID target_nodeid;
122 std::string target_nodename;
124 int32_t max_message_size;
126 bool send_large_transfer_authorized;
127 bool recv_large_transfer_authorized;
129 boost::atomic<bool> send_version4;
130 boost::atomic<bool> use_string_table4;
132 RR_SHARED_PTR<detail::StringTable> string_table4;
133 boost::mutex string_table4_lock;
134 std::list<uint32_t> string_table_4_confirming;
136 RR_UNORDERED_MAP<uint32_t, boost::tuple<std::vector<uint32_t>, boost::posix_time::ptime> > string_table_4_requests;
137 uint32_t string_table_4_requestid;
139 std::list<RR_SHARED_PTR<boost::asio::deadline_timer> > string_table_4_timers;
140 bool string_table_4_pause_updates;
142 bool string_table_4_closed;
146 bool disable_message4;
147 bool disable_string_table;
148 bool disable_async_io;
150 mutable_buffers active_recv_bufs;
152 RR_SHARED_PTR<AsyncMessageReader> async_reader;
154 size_t async_recv_size;
155 size_t async_recv_pos;
156 uint16_t async_recv_version;
157 size_t async_recv_continue_buf_count;
159 RR_SHARED_PTR<AsyncMessageWriter> async_writer;
161 uint16_t async_send_version;
162 const_buffers async_send_bufs;
164 uint32_t active_capabilities_message2_basic;
165 uint32_t active_capabilities_message4_basic;
166 uint32_t active_capabilities_message4_stringtable;
168 ASIOStreamBaseTransport(
const RR_SHARED_PTR<RobotRaconteurNode>& node);
170 RR_WEAK_PTR<RobotRaconteurNode> node;
173 RR_OVIRTUAL RR_SHARED_PTR<RobotRaconteurNode> GetNode() RR_OVERRIDE;
176 ASIOStreamBaseTransport(const ASIOStreamBaseTransport& that);
179 class AsyncAttachStream_args : public RRObject
182 RobotRaconteur::NodeID nodeid;
183 std::string nodename;
186 AsyncAttachStream_args(
const RobotRaconteur::NodeID& nodeid_, boost::string_ref nodename_)
187 : nodeid(nodeid_), nodename(RR_MOVE(nodename_.to_string()))
191 RR_OVIRTUAL std::string RRType() RR_OVERRIDE
193 return "RobotRaconteur::ASIOStreamBaseTransport::AsyncAttachStream_args";
197 virtual void AsyncAttachStream(
198 bool server,
const NodeID& target_nodeid, boost::string_ref target_nodename,
199 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
202 RR_OVIRTUAL
void AsyncSendMessage(
203 const RR_INTRUSIVE_PTR<Message>& m,
204 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& callback) RR_OVERRIDE;
205 RR_OVIRTUAL
void SendMessage(
const RR_INTRUSIVE_PTR<Message>& m) RR_OVERRIDE;
208 void SimpleAsyncSendMessage(
const RR_INTRUSIVE_PTR<Message>& m,
209 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
211 void SimpleAsyncEndSendMessage(
const RR_SHARED_PTR<RobotRaconteurException>& err);
213 virtual void AsyncAttachStream1(
214 const RR_SHARED_PTR<RRObject>& parameter,
const RR_SHARED_PTR<RobotRaconteurException>& err,
215 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
217 virtual void BeginSendMessage(
const RR_INTRUSIVE_PTR<Message>& m,
218 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
219 virtual void BeginSendMessage1(
220 const RR_INTRUSIVE_PTR<Message>& m,
221 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
223 virtual void EndSendMessage(
size_t startpos,
const boost::system::error_code& error,
size_t bytes_transferred,
224 const RR_INTRUSIVE_PTR<Message>& m,
size_t m_len,
225 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& callback,
226 const boost::shared_array<uint8_t>& buf);
227 virtual void EndSendMessage1();
229 virtual void EndSendMessage2(
const boost::system::error_code& error,
size_t bytes_transferred,
230 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
232 virtual void AsyncPauseSend(
const boost::function<
void(
const boost::system::error_code&)>& handler);
233 virtual void AsyncResumeSend();
235 virtual void BeginReceiveMessage1();
237 virtual void EndReceiveMessage1(
size_t startpos,
const boost::system::error_code& error,
size_t bytes_transferred);
239 virtual void EndReceiveMessage2(
size_t startpos,
const boost::system::error_code& error,
size_t bytes_transferred,
240 size_t message_size,
const boost::shared_array<uint8_t>& buf);
242 virtual void EndReceiveMessage3(
const RR_INTRUSIVE_PTR<Message>& message);
243 virtual void EndReceiveMessage4();
245 virtual void EndReceiveMessage5(
const boost::system::error_code& error,
size_t bytes_transferred);
247 virtual void AsyncPauseReceive(
const boost::function<
void(
const boost::system::error_code&)>& handler);
248 virtual void AsyncResumeReceive();
250 RR_OVIRTUAL
void Close() RR_OVERRIDE;
252 virtual
void heartbeat_timer_func(const boost::system::error_code& e);
254 boost::mutex heartbeat_timer_lock;
257 virtual
void MessageReceived(const RR_INTRUSIVE_PTR<Message>& m) = 0;
259 virtual
bool IsConnected();
261 virtual uint32_t StreamCapabilities(boost::string_ref name);
263 virtual
void AsyncCheckStreamCapability(
264 boost::string_ref name,
265 const boost::function<
void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
268 virtual
void BeginCheckStreamCapability(
269 boost::string_ref name,
270 const boost::function<
void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
272 void CheckStreamCapability_EndSendMessage(const RR_SHARED_PTR<RobotRaconteurException>& err);
274 static
void CheckStreamCapability_timercallback(RR_WEAK_PTR<ASIOStreamBaseTransport> t,
275 const boost::system::error_code& e);
277 void CheckStreamCapability_MessageReceived(const RR_INTRUSIVE_PTR<Message>& m);
281 boost::string_ref command, const RR_SHARED_PTR<RRObject>& args,
282 boost::function<
void(const RR_SHARED_PTR<RRObject>&, const RR_SHARED_PTR<RobotRaconteurException>&)>&);
284 virtual
void PeriodicCleanupTask();
286 RR_OVIRTUAL NodeID GetRemoteNodeID() RR_OVERRIDE;
289 virtual
void BeginStreamOp(
290 boost::string_ref command, const RR_SHARED_PTR<RRObject>& args,
291 boost::function<
void(const RR_SHARED_PTR<RRObject>&, const RR_SHARED_PTR<RobotRaconteurException>&)>&);
293 virtual RR_INTRUSIVE_PTR<MessageEntry> PackStreamOpRequest(boost::string_ref command,
294 const RR_SHARED_PTR<RRObject>& args);
296 virtual
void StreamOp_EndSendMessage(const RR_SHARED_PTR<RobotRaconteurException>& err);
298 static
void StreamOp_timercallback(RR_WEAK_PTR<ASIOStreamBaseTransport> t, const boost::system::error_code& e);
300 virtual
void StreamOpMessageReceived(const RR_INTRUSIVE_PTR<Message>& m);
302 virtual RR_INTRUSIVE_PTR<MessageEntry> ProcessStreamOpRequest(const RR_INTRUSIVE_PTR<MessageEntry>& request,
303 const RR_INTRUSIVE_PTR<MessageHeader>& header);
305 virtual RR_SHARED_PTR<RRObject> UnpackStreamOpResponse(const RR_INTRUSIVE_PTR<MessageEntry>& response,
306 const RR_INTRUSIVE_PTR<MessageHeader>& header);
308 virtual
void async_write_some(
310 const boost::function<
void(const boost::system::error_code& error,
size_t bytes_transferred)>& handler) = 0;
312 virtual
void async_read_some(
314 const boost::function<
void(const boost::system::error_code& error,
size_t bytes_transferred)>& handler) = 0;
316 virtual
size_t available() = 0;
318 virtual
bool IsLargeTransferAuthorized();
321 virtual
bool GetDisableMessage4();
322 virtual
void SetDisableMessage4(
bool d);
324 virtual
bool GetDisableStringTable();
325 virtual
void SetDisableStringTable(
bool d);
327 RR_OVIRTUAL
bool CheckCapabilityActive(uint32_t cap) RR_OVERRIDE;