Robot Raconteur Core C++ Library
Loading...
Searching...
No Matches
ThreadPool.h
Go to the documentation of this file.
1
23
24#include <boost/bind/placeholders.hpp>
26
27#ifndef ROBOTRACONTEUR_EMSCRIPTEN
28#include <boost/asio.hpp>
29#endif
30
31#pragma once
32
33namespace RobotRaconteur
34{
35class ROBOTRACONTEUR_CORE_API RobotRaconteurNode;
36
62class ROBOTRACONTEUR_CORE_API ThreadPool : public RR_ENABLE_SHARED_FROM_THIS<ThreadPool>, private boost::noncopyable
63{
64
65 protected:
66 std::vector<RR_SHARED_PTR<boost::thread> > threads;
67
68 boost::mutex queue_mutex;
69
70 RR_BOOST_ASIO_IO_CONTEXT _io_context;
71
72 size_t thread_count;
73
74 bool keepgoing;
75 boost::mutex keepgoing_lock;
76
77#if BOOST_ASIO_VERSION < 101200
78 RR_SHARED_PTR<RR_BOOST_ASIO_IO_CONTEXT::work> _work;
79#else
80 RR_SHARED_PTR<boost::asio::executor_work_guard<RR_BOOST_ASIO_IO_CONTEXT::executor_type> > _work;
81#endif
82
83 RR_WEAK_PTR<RobotRaconteurNode> node;
84
85 public:
97 ThreadPool(const RR_SHARED_PTR<RobotRaconteurNode>& node);
98 virtual ~ThreadPool();
99
100 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
101
107 virtual size_t GetThreadPoolCount();
108
116 virtual void SetThreadPoolCount(size_t count);
117
123 virtual void Post(boost::function<void()> function);
124
132 virtual bool TryPost(boost::function<void()> function);
133
134 virtual void Shutdown();
135
144 virtual RR_BOOST_ASIO_IO_CONTEXT& get_io_context();
145
146 protected:
147 virtual void start_new_thread();
148
149 virtual void thread_function();
150};
151
160class ROBOTRACONTEUR_CORE_API ThreadPoolFactory : private boost::noncopyable
161{
162 public:
169 virtual RR_SHARED_PTR<ThreadPool> NewThreadPool(const RR_SHARED_PTR<RobotRaconteurNode>& node)
170 {
171 return RR_MAKE_SHARED<ThreadPool>(node);
172 }
173 virtual ~ThreadPoolFactory();
174};
175
189class ROBOTRACONTEUR_CORE_API IOContextThreadPool : public ThreadPool
190{
191
192 protected:
193 RR_BOOST_ASIO_IO_CONTEXT& _external_io_context;
194 bool _multithreaded;
195
196 public:
206 IOContextThreadPool(const RR_SHARED_PTR<RobotRaconteurNode>& node, RR_BOOST_ASIO_IO_CONTEXT& external_io_context,
207 bool multithreaded);
208 RR_OVIRTUAL ~IOContextThreadPool() RR_OVERRIDE;
209
215 RR_OVIRTUAL size_t GetThreadPoolCount() RR_OVERRIDE;
216
224 RR_OVIRTUAL void SetThreadPoolCount(size_t count) RR_OVERRIDE;
225
226 RR_OVIRTUAL void Post(boost::function<void()> function) RR_OVERRIDE;
227 RR_OVIRTUAL bool TryPost(boost::function<void()> function) RR_OVERRIDE;
228
229 RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
230
231 RR_OVIRTUAL RR_BOOST_ASIO_IO_CONTEXT& get_io_context() RR_OVERRIDE;
232};
233
234namespace detail
235{
236ROBOTRACONTEUR_CORE_API bool ThreadPool_IsNodeMultithreaded(RR_WEAK_PTR<RobotRaconteurNode> node);
237}
238
239namespace detail
240{
241template <typename T>
242struct IOContextThreadPool_AsyncResultAdapter_traits
243{
244 typedef T result_type;
245};
246
247template <>
248struct IOContextThreadPool_AsyncResultAdapter_traits<void>
249{
250 typedef int32_t result_type;
251};
252
253template <typename T>
254struct IOContextThreadPool_AsyncResultAdapter_data
255{
256 typedef typename IOContextThreadPool_AsyncResultAdapter_traits<T>::result_type result_type;
257 boost::initialized<result_type> _result;
258 RR_SHARED_PTR<RobotRaconteurException> _exp;
259 boost::initialized<bool> _complete;
260};
261
262ROBOTRACONTEUR_CORE_API RR_SHARED_PTR<RobotRaconteurNode> IOContextThreadPool_RobotRaconteurNode_sp();
263
264ROBOTRACONTEUR_CORE_API void IOContextThreadPool_RobotRaconteurNode_DownCastAndThrowException(
265 const RR_SHARED_PTR<RobotRaconteurNode>& node, RobotRaconteurException& exp);
266} // namespace detail
267
287template <typename T>
289{
290 private:
291 RR_SHARED_PTR<RobotRaconteurNode> _node;
292 RR_BOOST_ASIO_IO_CONTEXT& _io_context;
293 RR_SHARED_PTR<detail::IOContextThreadPool_AsyncResultAdapter_data<T> > _data;
294
295 public:
296 typedef typename detail::IOContextThreadPool_AsyncResultAdapter_traits<T>::result_type result_type;
297
306 IOContextThreadPool_AsyncResultAdapter(RR_SHARED_PTR<RobotRaconteurNode>& node,
307 RR_BOOST_ASIO_IO_CONTEXT& io_context)
308 : _node(node), _io_context(io_context),
309 _data(RR_MAKE_SHARED<detail::IOContextThreadPool_AsyncResultAdapter_data<T> >())
310 {}
311
318 IOContextThreadPool_AsyncResultAdapter(RR_BOOST_ASIO_IO_CONTEXT& io_context)
319 : _node(detail::IOContextThreadPool_RobotRaconteurNode_sp()), _io_context(io_context),
320 _data(RR_MAKE_SHARED<detail::IOContextThreadPool_AsyncResultAdapter_data<T> >())
321 {}
322
323 void operator()(result_type res, const RR_SHARED_PTR<RobotRaconteurException>& exp)
324 {
325 _data->_complete.data() = true;
326 _data->_result.data() = res;
327 _data->_exp = exp;
328 }
329
330 void operator()(const RR_SHARED_PTR<RobotRaconteurException>& exp)
331 {
332 _data->_complete.data() = true;
333 _data->_exp = exp;
334 }
335
336 void operator()(result_type res)
337 {
338 _data->_complete.data() = true;
339 _data->_result.data() = res;
340 }
341
342 void operator()() { _data->_complete.data() = true; }
343
355 result_type GetResult()
356 {
357 while (!_data->_complete.data())
358 {
359 _io_context.run_one();
360 }
361
362 result_type res = result_type();
363 RR_SHARED_PTR<RobotRaconteurException> exp;
364 RR_SWAP(res, _data->_result.data());
365 exp = _data->_exp;
366 _data->_exp.reset();
367 if (exp)
368 {
369 RobotRaconteurException* exp1 = exp.get();
370 detail::IOContextThreadPool_RobotRaconteurNode_DownCastAndThrowException(_node, *exp1);
371 }
372 return res;
373 }
374
389 bool PollResult(result_type& ret, RR_SHARED_PTR<RobotRaconteurException>& exp)
390 {
391 if (!_data->_complete.data())
392 {
393 return false;
394 }
395
396 RR_SWAP(ret, _data->_result.data());
397 RR_SWAP(exp, _data->_exp);
398 return true;
399 }
400};
401
402#define ROBOTRACONTEUR_ASSERT_MULTITHREADED(node) \
403 BOOST_ASSERT_MSG(detail::ThreadPool_IsNodeMultithreaded(node), "multithreading required for requested operation")
404
405#ifndef ROBOTRACONTEUR_NO_CXX11_TEMPLATE_ALIASES
407using ThreadPoolPtr = RR_SHARED_PTR<ThreadPool>;
409using ThreadPoolFactoryPtr = RR_SHARED_PTR<ThreadPoolFactory>;
411using IOContextThreadPoolPtr = RR_SHARED_PTR<IOContextThreadPool>;
412#endif
413} // namespace RobotRaconteur
boost::shared_ptr< ThreadPool > ThreadPoolPtr
Convenience alias for ThreadPool shared_ptr.
Definition ThreadPool.h:407
boost::shared_ptr< ThreadPoolFactory > ThreadPoolFactoryPtr
Convenience alias for ThreadPoolFactory shared_ptr.
Definition ThreadPool.h:409
boost::shared_ptr< IOContextThreadPool > IOContextThreadPoolPtr
Convenience alias for IOContextThreadPool shared_ptr.
Definition ThreadPool.h:411
bool PollResult(result_type &ret, boost::shared_ptr< RobotRaconteurException > &exp)
Polls for a result, nonblocking.
Definition ThreadPool.h:389
IOContextThreadPool_AsyncResultAdapter(boost::shared_ptr< RobotRaconteurNode > &node, boost::asio::io_context &io_context)
Construct an IOContextThreadPool_AsyncResultAdapter.
Definition ThreadPool.h:306
result_type GetResult()
Get the result of the asynchronous operation.
Definition ThreadPool.h:355
IOContextThreadPool_AsyncResultAdapter(boost::asio::io_context &io_context)
Construct an IOContextThreadPool_AsyncResultAdapter for the singleton RobotRaconteurNode.
Definition ThreadPool.h:318
RR_OVIRTUAL boost::asio::io_context & get_io_context() RR_OVERRIDE
Get the boost::asio::io_context object.
RR_OVIRTUAL void SetThreadPoolCount(size_t count) RR_OVERRIDE
Invalid for IOContextThreadPool, throws InvalidOperationException.
RR_OVIRTUAL bool TryPost(boost::function< void()> function) RR_OVERRIDE
Try posting a function to be executed by the thread pool in a worker thread and return immediately.
RR_OVIRTUAL size_t GetThreadPoolCount() RR_OVERRIDE
Returns 1 if single threaded, 2 if multithreaded.
IOContextThreadPool(const boost::shared_ptr< RobotRaconteurNode > &node, boost::asio::io_context &external_io_context, bool multithreaded)
Construct an IOContextThreadPool.
RR_OVIRTUAL void Post(boost::function< void()> function) RR_OVERRIDE
Post a function to be executed by the thread pool in a worker thread and return immediately.
Base class for Robot Raconteur exceptions.
Definition Error.h:53
The central node implementation.
Definition RobotRaconteurNode.h:132
ThreadPool factory for use with RobotRaconteurNode.
Definition ThreadPool.h:161
virtual boost::shared_ptr< ThreadPool > NewThreadPool(const boost::shared_ptr< RobotRaconteurNode > &node)
Construct and return a new threadpool.
Definition ThreadPool.h:169
virtual size_t GetThreadPoolCount()
Get the number of threads in the thread pool.
virtual boost::asio::io_context & get_io_context()
Get the boost::asio::io_context object.
virtual bool TryPost(boost::function< void()> function)
Try posting a function to be executed by the thread pool in a worker thread and return immediately.
ThreadPool(const boost::shared_ptr< RobotRaconteurNode > &node)
Construct a new ThreadPool.
virtual void SetThreadPoolCount(size_t count)
Set the desired number of threads in the thread pool.
virtual void Post(boost::function< void()> function)
Post a function to be executed by the thread pool in a worker thread and return immediately.