Robot Raconteur Core C++ Library
Loading...
Searching...
No Matches
ASIOStreamBaseTransport.h
Go to the documentation of this file.
1
23
25#include <boost/bind/placeholders.hpp>
26#include <list>
27#include <boost/bind/protect.hpp>
28#include <boost/shared_array.hpp>
29#include <boost/atomic.hpp>
31
32#ifndef ROBOTRACONTEUR_EMSCRIPTEN
33#include <boost/asio.hpp>
34#include <boost/asio/steady_timer.hpp>
35#endif
36
37#pragma once
38
39namespace RobotRaconteur
40{
41
42namespace detail
43{
44class StringTable;
45
46class ASIOStreamBaseTransport : public ITransportConnection, public RR_ENABLE_SHARED_FROM_THIS<ASIOStreamBaseTransport>
47{
48
49 protected:
50 struct message_queue_entry
51 {
52 RR_INTRUSIVE_PTR<Message> message;
53 boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> callback;
54 };
55
56 RR_BOOST_ASIO_IO_CONTEXT& _io_context;
57
58 boost::atomic<bool> connected;
59
60 boost::shared_array<uint8_t> sendbuf;
61 size_t sendbuf_len;
62
63 boost::mutex send_lock;
64 boost::mutex sync_send_lock;
65 bool sending;
66 bool send_pause_request;
67 bool send_paused;
68 boost::function<void(const boost::system::error_code&)> send_pause_request_handler;
69
70 std::list<message_queue_entry> send_queue;
71 size_t send_message_size;
72 boost::condition_variable send_event;
73
74 boost::atomic<boost::posix_time::ptime> tlastsend;
75 boost::atomic<boost::posix_time::ptime> tlastrecv;
76
77 boost::array<uint8_t, 8> streammagic;
78 uint32_t recv_message_size;
79 boost::shared_array<uint8_t> recvbuf;
80 size_t recvbuf_len;
81 size_t recvbuf_pos;
82 size_t recvbuf_end;
83 bool recv_pause_request;
84 bool recv_paused;
85 bool receiving;
86 boost::function<void(const boost::system::error_code&)> recv_pause_request_handler;
87
88 boost::mutex recv_lock;
89
90 RR_SHARED_PTR<boost::asio::deadline_timer> heartbeat_timer;
91
92 uint32_t ReceiveTimeout;
93 uint32_t HeartbeatPeriod;
94
95 bool SendHeartbeat;
96
97 bool CheckStreamCapability_closed;
98 bool CheckStreamCapability_waiting;
99 boost::mutex CheckStreamCapability_lock;
100 boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)> CheckStreamCapability_callback;
101 std::queue<
102 boost::tuple<std::string, boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)> > >
103 CheckStreamCapability_queue;
104
105 RR_SHARED_PTR<boost::asio::deadline_timer> CheckStreamCapability_timer;
106
107 bool streamop_closed;
108 bool streamop_waiting;
109 boost::mutex streamop_lock;
110 boost::function<void(const RR_SHARED_PTR<RRObject>&, const RR_SHARED_PTR<RobotRaconteurException>&)>
111 streamop_callback;
112 std::queue<boost::tuple<
113 std::string, const RR_SHARED_PTR<RRObject>&,
114 boost::function<void(const RR_SHARED_PTR<RRObject>&, const RR_SHARED_PTR<RobotRaconteurException>&)> > >
115 streamop_queue;
116
117 RR_SHARED_PTR<boost::asio::deadline_timer> streamop_timer;
118 NodeID RemoteNodeID;
119 boost::shared_mutex RemoteNodeID_lock;
120
121 NodeID target_nodeid;
122 std::string target_nodename;
123
124 int32_t max_message_size;
125
126 bool send_large_transfer_authorized;
127 bool recv_large_transfer_authorized;
128
129 boost::atomic<bool> send_version4;
130 boost::atomic<bool> use_string_table4;
131
132 RR_SHARED_PTR<detail::StringTable> string_table4;
133 boost::mutex string_table4_lock;
134 std::list<uint32_t> string_table_4_confirming;
135
136 RR_UNORDERED_MAP<uint32_t, boost::tuple<std::vector<uint32_t>, boost::posix_time::ptime> > string_table_4_requests;
137 uint32_t string_table_4_requestid;
138
139 std::list<RR_SHARED_PTR<boost::asio::deadline_timer> > string_table_4_timers;
140 bool string_table_4_pause_updates;
141
142 bool string_table_4_closed;
143
144 bool server;
145
146 bool disable_message4;
147 bool disable_string_table;
148 bool disable_async_io;
149
150 mutable_buffers active_recv_bufs;
151
152 RR_SHARED_PTR<AsyncMessageReader> async_reader;
153
154 size_t async_recv_size;
155 size_t async_recv_pos;
156 uint16_t async_recv_version;
157 size_t async_recv_continue_buf_count;
158
159 RR_SHARED_PTR<AsyncMessageWriter> async_writer;
160
161 uint16_t async_send_version;
162 const_buffers async_send_bufs;
163
164 uint32_t active_capabilities_message2_basic;
165 uint32_t active_capabilities_message4_basic;
166 uint32_t active_capabilities_message4_stringtable;
167
168 ASIOStreamBaseTransport(const RR_SHARED_PTR<RobotRaconteurNode>& node);
169
170 RR_WEAK_PTR<RobotRaconteurNode> node;
171
172 public:
173 RR_OVIRTUAL RR_SHARED_PTR<RobotRaconteurNode> GetNode() RR_OVERRIDE;
174
175 private:
176 ASIOStreamBaseTransport(const ASIOStreamBaseTransport& that);
177
178 protected:
179 class AsyncAttachStream_args : public RRObject
180 {
181 public:
182 RobotRaconteur::NodeID nodeid;
183 std::string nodename;
184
185 // NOLINTBEGIN(bugprone-throw-keyword-missing)
186 AsyncAttachStream_args(const RobotRaconteur::NodeID& nodeid_, boost::string_ref nodename_)
187 : nodeid(nodeid_), nodename(RR_MOVE(nodename_.to_string()))
188 {}
189 // NOLINTEND(bugprone-throw-keyword-missing)
190
191 RR_OVIRTUAL std::string RRType() RR_OVERRIDE
192 {
193 return "RobotRaconteur::ASIOStreamBaseTransport::AsyncAttachStream_args";
194 }
195 };
196
197 virtual void AsyncAttachStream(
198 bool server, const NodeID& target_nodeid, boost::string_ref target_nodename,
199 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
200
201 public:
202 RR_OVIRTUAL void AsyncSendMessage(
203 const RR_INTRUSIVE_PTR<Message>& m,
204 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& callback) RR_OVERRIDE;
205 RR_OVIRTUAL void SendMessage(const RR_INTRUSIVE_PTR<Message>& m) RR_OVERRIDE;
206
207 protected:
208 void SimpleAsyncSendMessage(const RR_INTRUSIVE_PTR<Message>& m,
209 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
210
211 void SimpleAsyncEndSendMessage(const RR_SHARED_PTR<RobotRaconteurException>& err);
212
213 virtual void AsyncAttachStream1(
214 const RR_SHARED_PTR<RRObject>& parameter, const RR_SHARED_PTR<RobotRaconteurException>& err,
215 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
216
217 virtual void BeginSendMessage(const RR_INTRUSIVE_PTR<Message>& m,
218 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
219 virtual void BeginSendMessage1(
220 const RR_INTRUSIVE_PTR<Message>& m,
221 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
222
223 virtual void EndSendMessage(size_t startpos, const boost::system::error_code& error, size_t bytes_transferred,
224 const RR_INTRUSIVE_PTR<Message>& m, size_t m_len,
225 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& callback,
226 const boost::shared_array<uint8_t>& buf);
227 virtual void EndSendMessage1();
228
229 virtual void EndSendMessage2(const boost::system::error_code& error, size_t bytes_transferred,
230 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
231
232 virtual void AsyncPauseSend(const boost::function<void(const boost::system::error_code&)>& handler);
233 virtual void AsyncResumeSend();
234
235 virtual void BeginReceiveMessage1();
236
237 virtual void EndReceiveMessage1(size_t startpos, const boost::system::error_code& error, size_t bytes_transferred);
238
239 virtual void EndReceiveMessage2(size_t startpos, const boost::system::error_code& error, size_t bytes_transferred,
240 size_t message_size, const boost::shared_array<uint8_t>& buf);
241
242 virtual void EndReceiveMessage3(const RR_INTRUSIVE_PTR<Message>& message);
243 virtual void EndReceiveMessage4();
244
245 virtual void EndReceiveMessage5(const boost::system::error_code& error, size_t bytes_transferred);
246
247 virtual void AsyncPauseReceive(const boost::function<void(const boost::system::error_code&)>& handler);
248 virtual void AsyncResumeReceive();
249
250 RR_OVIRTUAL void Close() RR_OVERRIDE;
251
252 virtual void heartbeat_timer_func(const boost::system::error_code& e);
253
254 boost::mutex heartbeat_timer_lock;
255
256 public:
257 virtual void MessageReceived(const RR_INTRUSIVE_PTR<Message>& m) = 0;
258
259 virtual bool IsConnected();
260
261 virtual uint32_t StreamCapabilities(boost::string_ref name);
262
263 virtual void AsyncCheckStreamCapability(
264 boost::string_ref name,
265 const boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
266
267 protected:
268 virtual void BeginCheckStreamCapability(
269 boost::string_ref name,
270 const boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>& callback);
271
272 void CheckStreamCapability_EndSendMessage(const RR_SHARED_PTR<RobotRaconteurException>& err);
273
274 static void CheckStreamCapability_timercallback(RR_WEAK_PTR<ASIOStreamBaseTransport> t,
275 const boost::system::error_code& e);
276
277 void CheckStreamCapability_MessageReceived(const RR_INTRUSIVE_PTR<Message>& m);
278
279 public:
280 void AsyncStreamOp(
281 boost::string_ref command, const RR_SHARED_PTR<RRObject>& args,
282 boost::function<void(const RR_SHARED_PTR<RRObject>&, const RR_SHARED_PTR<RobotRaconteurException>&)>&);
283
284 virtual void PeriodicCleanupTask();
285
286 RR_OVIRTUAL NodeID GetRemoteNodeID() RR_OVERRIDE;
287
288 protected:
289 virtual void BeginStreamOp(
290 boost::string_ref command, const RR_SHARED_PTR<RRObject>& args,
291 boost::function<void(const RR_SHARED_PTR<RRObject>&, const RR_SHARED_PTR<RobotRaconteurException>&)>&);
292
293 virtual RR_INTRUSIVE_PTR<MessageEntry> PackStreamOpRequest(boost::string_ref command,
294 const RR_SHARED_PTR<RRObject>& args);
295
296 virtual void StreamOp_EndSendMessage(const RR_SHARED_PTR<RobotRaconteurException>& err);
297
298 static void StreamOp_timercallback(RR_WEAK_PTR<ASIOStreamBaseTransport> t, const boost::system::error_code& e);
299
300 virtual void StreamOpMessageReceived(const RR_INTRUSIVE_PTR<Message>& m);
301
302 virtual RR_INTRUSIVE_PTR<MessageEntry> ProcessStreamOpRequest(const RR_INTRUSIVE_PTR<MessageEntry>& request,
303 const RR_INTRUSIVE_PTR<MessageHeader>& header);
304
305 virtual RR_SHARED_PTR<RRObject> UnpackStreamOpResponse(const RR_INTRUSIVE_PTR<MessageEntry>& response,
306 const RR_INTRUSIVE_PTR<MessageHeader>& header);
307
308 virtual void async_write_some(
309 const_buffers& b,
310 const boost::function<void(const boost::system::error_code& error, size_t bytes_transferred)>& handler) = 0;
311
312 virtual void async_read_some(
313 mutable_buffers& b,
314 const boost::function<void(const boost::system::error_code& error, size_t bytes_transferred)>& handler) = 0;
315
316 virtual size_t available() = 0;
317
318 virtual bool IsLargeTransferAuthorized();
319
320 public:
321 virtual bool GetDisableMessage4();
322 virtual void SetDisableMessage4(bool d);
323
324 virtual bool GetDisableStringTable();
325 virtual void SetDisableStringTable(bool d);
326
327 RR_OVIRTUAL bool CheckCapabilityActive(uint32_t cap) RR_OVERRIDE;
328};
329
330} // namespace detail
331} // namespace RobotRaconteur