Robot Raconteur Core C++ Library
Loading...
Searching...
No Matches
AsyncUtils.h
Go to the documentation of this file.
1
23
24#include <boost/bind/placeholders.hpp>
25#include <boost/scope_exit.hpp>
26#include <boost/system/error_code.hpp>
27#include <boost/bind/bind.hpp>
28
34
35#pragma once
36
37namespace RobotRaconteur
38{
39class ROBOTRACONTEUR_CORE_API RobotRaconteurNode;
40
41namespace detail
42{
43template <typename T>
44class sync_async_handler : private boost::noncopyable
45{
46 public:
47 RR_SHARED_PTR<AutoResetEvent> ev;
48 RR_SHARED_PTR<RobotRaconteurException> err;
49 RR_SHARED_PTR<T> data;
50 boost::mutex data_lock;
51
52 sync_async_handler() { ev = RR_MAKE_SHARED<AutoResetEvent>(); }
53
54 sync_async_handler(const RR_SHARED_PTR<RobotRaconteurException>& err)
55 {
56 ev = RR_MAKE_SHARED<AutoResetEvent>();
57 this->err = err;
58 }
59
60 void operator()() { ev->Set(); }
61
62 void operator()(const RR_SHARED_PTR<RobotRaconteurException>& err)
63 {
64 boost::mutex::scoped_lock lock(data_lock);
65 this->err = err;
66 ev->Set();
67 }
68
69 void operator()(const RR_SHARED_PTR<T>& data, const RR_SHARED_PTR<RobotRaconteurException>& err)
70 {
71 boost::mutex::scoped_lock lock(data_lock);
72 this->err = err;
73 this->data = data;
74 ev->Set();
75 }
76
77 RR_SHARED_PTR<T> end()
78 {
79 ev->WaitOne();
80
81 boost::mutex::scoped_lock lock(data_lock);
82 if (err)
83 {
84 RobotRaconteurExceptionUtil::DownCastAndThrowException(err);
85 }
86
87 if (!data)
88 throw InternalErrorException("Internal async error");
89
90 return data;
91 }
92
93 void end_void()
94 {
95 ev->WaitOne();
96
97 boost::mutex::scoped_lock lock(data_lock);
98 if (err)
99 {
100 RobotRaconteurExceptionUtil::DownCastAndThrowException(err);
101 }
102 }
103
104 bool try_end(RR_SHARED_PTR<T>& res, RR_SHARED_PTR<RobotRaconteurException>& err_out)
105 {
106 ev->WaitOne();
107
108 boost::mutex::scoped_lock lock(data_lock);
109 if (err)
110 {
111 err_out = err;
112 return false;
113 }
114
115 if (!data)
116 {
117 err_out = RR_MAKE_SHARED<InternalErrorException>("Internal async error");
118 return false;
119 }
120
121 res = data;
122 return true;
123 }
124
125 bool try_end_void(RR_SHARED_PTR<RobotRaconteurException>& err_out)
126 {
127 ev->WaitOne();
128
129 boost::mutex::scoped_lock lock(data_lock);
130 if (err)
131 {
132 err_out = err;
133 return false;
134 }
135
136 return true;
137 }
138};
139
140RR_SHARED_PTR<Timer> async_timeout_wrapper_CreateTimer(const RR_SHARED_PTR<RobotRaconteurNode>& node,
141 const boost::posix_time::time_duration& period,
142 RR_MOVE_ARG(boost::function<void(const TimerEvent&)>) handler,
143 bool oneshot);
144
145template <typename T>
146void async_timeout_wrapper_closer(const RR_SHARED_PTR<T>& d)
147{
148 try
149 {
150 d->Close();
151 }
152 catch (std::exception&)
153 {}
154}
155
156template <typename T, typename T2>
157void async_timeout_wrapper_closer(const RR_SHARED_PTR<T>& d)
158{
159 try
160 {
161 RR_SHARED_PTR<T2> t2 = RR_DYNAMIC_POINTER_CAST<T2>(d);
162 if (!t2)
163 return;
164 t2->Close();
165 }
166 catch (std::exception&)
167 {}
168}
169
170template <typename T>
171class async_timeout_wrapper : public RR_ENABLE_SHARED_FROM_THIS<async_timeout_wrapper<T> >, private boost::noncopyable
172{
173 private:
174 boost::function<void(const RR_SHARED_PTR<T>&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler_;
175 RR_SHARED_PTR<Timer> timeout_timer_;
176 boost::mutex handled_lock;
177 bool handled;
178 RR_SHARED_PTR<RobotRaconteurException> timeout_exception_;
179 boost::function<void(const RR_SHARED_PTR<T>&)> deleter_;
180 RR_WEAK_PTR<RobotRaconteurNode> node;
181
182 public:
183 async_timeout_wrapper(
184 const RR_SHARED_PTR<RobotRaconteurNode>& node,
185 boost::function<void(const RR_SHARED_PTR<T>&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
186 RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<T>&)>) deleter = 0)
187 : handler_(handler), handled(false), node(node)
188 {
189 deleter_.swap(deleter);
190 }
191
192 void start_timer(int32_t timeout, const RR_SHARED_PTR<RobotRaconteurException>& timeout_exception =
193 RR_MAKE_SHARED<ConnectionException>("Timeout during operation"));
194
195 void operator()(const RR_SHARED_PTR<T>& data, const RR_SHARED_PTR<RobotRaconteurException>& err)
196 {
197 {
198 boost::mutex::scoped_lock lock(handled_lock);
199 if (handled)
200 {
201 if (data && deleter_)
202 deleter_(data);
203 return;
204 }
205 handled = true;
206
207 if (timeout_timer_)
208 timeout_timer_->TryStop();
209
210 timeout_timer_.reset();
211 }
212
213 handler_(data, err);
214 }
215
216 void handle_error(const RR_SHARED_PTR<RobotRaconteurException>& err)
217 {
218 {
219 boost::mutex::scoped_lock lock(handled_lock);
220 if (handled)
221 return;
222 handled = true;
223
224 {
225
226 if (timeout_timer_)
227 timeout_timer_->TryStop();
228
229 timeout_timer_.reset();
230 }
231 }
232
233 handler_(RR_SHARED_PTR<T>(), err);
234 }
235
236 void handle_error(const boost::system::error_code& err)
237 {
238 if (err.value() == boost::system::errc::timed_out)
239 handle_error(timeout_exception_);
240 handle_error(RR_MAKE_SHARED<ConnectionException>(err.message()));
241 }
242
243 private:
244 void timeout_handler(const TimerEvent& /*e*/)
245 {
246 {
247 boost::mutex::scoped_lock lock(handled_lock);
248 if (handled)
249 return;
250 handled = true;
251
252 timeout_timer_.reset();
253 }
254
255 handler_(RR_SHARED_PTR<T>(), timeout_exception_);
256 }
257};
258
259template <typename T>
260void async_timeout_wrapper<T>::start_timer(int32_t timeout,
261 const RR_SHARED_PTR<RobotRaconteurException>& timeout_exception)
262{
263 RR_SHARED_PTR<RobotRaconteurNode> n = node.lock();
264 if (!n)
265 throw InvalidOperationException("Node has been released");
266
267 boost::mutex::scoped_lock lock(handled_lock);
268
269 if (handled)
270 return;
271
272 if (timeout != RR_TIMEOUT_INFINITE)
273 {
274 timeout_exception_ = RR_MOVE(timeout_exception);
275 timeout_timer_ =
276 async_timeout_wrapper_CreateTimer(n, boost::posix_time::milliseconds(timeout),
277 boost::bind(&async_timeout_wrapper<T>::timeout_handler,
278 this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)),
279 true);
280 timeout_timer_->Start();
281 }
282}
283
284class ROBOTRACONTEUR_CORE_API async_signal_semaphore : private boost::noncopyable
285{
286 protected:
287 boost::mutex this_lock;
288 boost::condition_variable next_wait;
289 boost::initialized<bool> running;
290 boost::initialized<bool> next;
291 boost::initialized<uint64_t> next_id;
292
293 public:
294 template <typename F>
295 bool try_fire_next(BOOST_ASIO_MOVE_ARG(F) h)
296 {
297 boost::mutex::scoped_lock lock(this_lock);
298
299 if (running || next)
300 {
301 uint64_t my_id = ++next_id.data();
302
303 if (next.data())
304 {
305 next_wait.notify_all();
306 }
307 else
308 {
309 next.data() = true;
310 }
311
312 while (running)
313 {
314 next_wait.wait(lock);
315 if (my_id != next_id)
316 return false;
317 }
318
319 next.data() = false;
320 }
321
322 running.data() = true;
323
324 BOOST_SCOPE_EXIT_TPL(this_)
325 {
326 boost::mutex::scoped_lock lock2(this_->this_lock);
327 this_->running.data() = false;
328 this_->next_wait.notify_all();
329 }
330 BOOST_SCOPE_EXIT_END;
331
332 lock.unlock();
333
334 h();
335
336 return true;
337 }
338};
339
340class ROBOTRACONTEUR_CORE_API async_signal_pool_semaphore
341 : public RR_ENABLE_SHARED_FROM_THIS<async_signal_pool_semaphore>,
342 private boost::noncopyable
343{
344 protected:
345 boost::mutex this_lock;
346 boost::initialized<bool> running;
347 boost::function<void()> next;
348 RR_WEAK_PTR<RobotRaconteurNode> node;
349
350 public:
351 async_signal_pool_semaphore(const RR_SHARED_PTR<RobotRaconteurNode>& node) : node(node) {}
352
353 template <typename F>
354 void try_fire_next(BOOST_ASIO_MOVE_ARG(F) h)
355 {
356 boost::mutex::scoped_lock lock(this_lock);
357
358 if (!running)
359 {
360 RR_SHARED_PTR<RobotRaconteurNode> node1 = this->node.lock();
361 if (!node1)
362 return;
363 do_post(node1, h);
364 }
365 else
366 {
367 next = h;
368 }
369 }
370
371 protected:
372 void do_fire_next(const boost::function<void()>& h)
373 {
374 try
375 {
376 h();
377 }
378 catch (std::exception& exp)
379 {
380 handle_exception(&exp);
381 }
382
383 BOOST_SCOPE_EXIT(this_)
384 {
385 boost::mutex::scoped_lock lock2(this_->this_lock);
386
387 boost::function<void()> h2;
388 h2.swap(this_->next);
389 this_->next.clear();
390 this_->running.data() = false;
391 if (!h2)
392 return;
393 RR_SHARED_PTR<RobotRaconteurNode> node = this_->node.lock();
394 if (!node)
395 return;
396 try
397 {
398 this_->do_post(node,
399 boost::bind(&async_signal_pool_semaphore::do_fire_next, this_->shared_from_this(), h2));
400 }
401 catch (std::exception&)
402 {}
403 this_->running.data() = true;
404 }
405 BOOST_SCOPE_EXIT_END;
406 }
407
408 void do_post(const RR_SHARED_PTR<RobotRaconteurNode>& node1, RR_MOVE_ARG(boost::function<void()>) h);
409
410 void handle_exception(std::exception* exp);
411};
412
413ROBOTRACONTEUR_CORE_API void InvokeHandler_HandleException(RR_WEAK_PTR<RobotRaconteurNode> node, std::exception& exp);
414
415ROBOTRACONTEUR_CORE_API void InvokeHandler_DoPost(RR_WEAK_PTR<RobotRaconteurNode> node,
416 const boost::function<void()>& h, bool shutdown_op = false,
417 bool throw_on_released = true);
418
419ROBOTRACONTEUR_CORE_API void InvokeHandler(RR_WEAK_PTR<RobotRaconteurNode> node,
420 const boost::function<void()>& handler);
421
422template <typename T>
423void InvokeHandler(RR_WEAK_PTR<RobotRaconteurNode> node, const typename boost::function<void(const T&)>& handler,
424 const T& value)
425{
426 try
427 {
428 handler(value);
429 }
430 catch (std::exception& exp)
431 {
432 InvokeHandler_HandleException(RR_MOVE(node), exp);
433 }
434}
435
436ROBOTRACONTEUR_CORE_API void InvokeHandler(
437 RR_WEAK_PTR<RobotRaconteurNode> node,
438 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
439
440ROBOTRACONTEUR_CORE_API void InvokeHandlerWithException(
441 RR_WEAK_PTR<RobotRaconteurNode> node,
442 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
443 const RR_SHARED_PTR<RobotRaconteurException>& exp);
444
445ROBOTRACONTEUR_CORE_API void InvokeHandlerWithException(
446 RR_WEAK_PTR<RobotRaconteurNode> node,
447 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler, std::exception& exp,
448 MessageErrorType default_err = MessageErrorType_UnknownError);
449
450template <typename T>
451void InvokeHandler(
452 RR_WEAK_PTR<RobotRaconteurNode> node,
453 const typename boost::function<void(const T&, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
454 const T& value)
455{
456 try
457 {
458 handler(value, RR_SHARED_PTR<RobotRaconteurException>());
459 }
460 catch (std::exception& exp)
461 {
462 InvokeHandler_HandleException(RR_MOVE(node), exp);
463 }
464}
465
466template <typename T>
467void InvokeHandlerWithException(
468 RR_WEAK_PTR<RobotRaconteurNode> node,
469 const typename boost::function<void(T, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
470 const RR_SHARED_PTR<RobotRaconteurException>& exp)
471{
472 typename boost::initialized<typename boost::remove_reference<T>::type> default_value;
473 try
474 {
475 handler(default_value, exp);
476 }
477 catch (std::exception& exp)
478 {
479 InvokeHandler_HandleException(RR_MOVE(node), exp);
480 }
481}
482
483template <typename T>
484void InvokeHandlerWithException(
485 RR_WEAK_PTR<RobotRaconteurNode> node,
486 const typename boost::function<void(T, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
487 std::exception& exp, MessageErrorType default_err = MessageErrorType_UnknownError)
488{
489 typename boost::initialized<typename boost::remove_const<typename boost::remove_reference<T>::type>::type>
490 default_value;
491 try
492 {
493 RR_SHARED_PTR<RobotRaconteurException> err =
494 RobotRaconteurExceptionUtil::ExceptionToSharedPtr(exp, default_err);
495 handler(default_value, err);
496 }
497 catch (std::exception& exp)
498 {
499 InvokeHandler_HandleException(RR_MOVE(node), exp);
500 }
501}
502
503ROBOTRACONTEUR_CORE_API void PostHandler(RR_WEAK_PTR<RobotRaconteurNode> node, const boost::function<void()>& handler,
504 bool shutdown_op = false, bool throw_on_released = true);
505
506template <typename T>
507void PostHandler(RR_WEAK_PTR<RobotRaconteurNode> node, const typename boost::function<void(const T&)>& handler,
508 const T& value, bool shutdown_op = false, bool throw_on_released = true)
509{
510 boost::function<void()> h = boost::bind(handler, value);
511 InvokeHandler_DoPost(RR_MOVE(node), h, shutdown_op, throw_on_released);
512}
513
514ROBOTRACONTEUR_CORE_API void PostHandler(
515 RR_WEAK_PTR<RobotRaconteurNode> node,
516 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler, bool shutdown_op = false,
517 bool throw_on_released = true);
518
519ROBOTRACONTEUR_CORE_API void PostHandlerWithException(
520 RR_WEAK_PTR<RobotRaconteurNode> node,
521 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
522 const RR_SHARED_PTR<RobotRaconteurException>& exp, bool shutdown_op = false, bool throw_on_released = true);
523
524ROBOTRACONTEUR_CORE_API void PostHandlerWithException(
525 RR_WEAK_PTR<RobotRaconteurNode> node,
526 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler, std::exception& exp,
527 MessageErrorType default_err = MessageErrorType_UnknownError, bool shutdown_op = false,
528 bool throw_on_released = true);
529
530template <typename T>
531void PostHandler(RR_WEAK_PTR<RobotRaconteurNode> node,
532 const typename boost::function<void(const T&, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
533 const T& value, bool shutdown_op = false, bool throw_on_released = true)
534{
535 boost::function<void()> h = boost::bind(handler, value, RR_SHARED_PTR<RobotRaconteurException>());
536 InvokeHandler_DoPost(RR_MOVE(node), h, shutdown_op, throw_on_released);
537}
538
539template <typename T>
540void PostHandlerWithException(
541 RR_WEAK_PTR<RobotRaconteurNode> node,
542 const typename boost::function<void(T, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
543 const RR_SHARED_PTR<RobotRaconteurException>& exp, bool shutdown_op = false, bool throw_on_released = true)
544{
545 typename boost::initialized<typename boost::remove_reference<T>::type> default_value;
546 boost::function<void()> h = boost::bind(handler, default_value, exp);
547 InvokeHandler_DoPost(RR_MOVE(node), h, shutdown_op, throw_on_released);
548}
549
550template <typename T>
551void PostHandlerWithException(
552 RR_WEAK_PTR<RobotRaconteurNode> node,
553 const typename boost::function<void(T, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
554 std::exception& exp, MessageErrorType default_err = MessageErrorType_UnknownError, bool shutdown_op = false,
555 bool throw_on_released = true)
556{
557 typename boost::initialized<typename boost::remove_reference<T>::type> default_value;
558 RR_SHARED_PTR<RobotRaconteurException> err = RobotRaconteurExceptionUtil::ExceptionToSharedPtr(exp, default_err);
559 boost::function<void()> h = boost::bind(handler, default_value, err);
560 InvokeHandler_DoPost(RR_MOVE(node), h, shutdown_op, throw_on_released);
561}
562
563} // namespace detail
564} // namespace RobotRaconteur
#define RR_TIMEOUT_INFINITE
Disable timeout for asynchronous operations.
Definition RobotRaconteurConstants.h:566
The central node implementation.
Definition RobotRaconteurNode.h:132