1 #ifndef N_DIMENSIONAL_EXECUTOR_H
2 #define N_DIMENSIONAL_EXECUTOR_H
11 #include <condition_variable>
17 #include <THnSparse.h>
19 #include "NThreadData.h"
48 void Execute(
const std::function<
void(
const std::vector<int> & coords)> & func);
56 template <
typename TObject>
57 void ExecuteParallel(
const std::function<
void(
const std::vector<int> & coords, TObject & thread_object)> & func,
58 std::vector<TObject> & thread_objects);
98 template <
typename TObject>
100 const std::function<
void(
const std::vector<int> & coords, TObject & thread_object)> & func,
101 std::vector<TObject> & thread_objects)
106 size_t threads_to_use = thread_objects.size();
107 if (threads_to_use == 0) {
108 throw std::invalid_argument(
"Thread objects vector cannot be empty.");
111 std::vector<std::thread> workers;
112 std::queue<std::function<void(TObject &)>> tasks;
113 std::mutex queue_mutex;
114 std::condition_variable condition_producer;
115 std::condition_variable condition_consumer;
116 std::atomic<size_t> active_tasks = 0;
117 std::atomic<bool> stop_pool =
false;
119 std::exception_ptr first_exception =
nullptr;
120 std::mutex exception_mutex;
123 auto worker_logic = [&](TObject & my_object) {
126 std::ostringstream oss;
127 oss <<
"wk_" << std::setw(3) << std::setfill(
'0') << md->
GetAssignedIndex();
131 std::function<void(TObject &)> task_payload;
132 bool task_acquired =
false;
136 std::unique_lock<std::mutex> lock(queue_mutex);
137 condition_producer.wait(lock, [&] {
return stop_pool || !tasks.empty(); });
140 if (stop_pool && tasks.empty()) {
146 if (!tasks.empty()) {
147 task_payload = std::move(tasks.front());
149 task_acquired =
true;
159 task_payload(my_object);
165 std::lock_guard<std::mutex> lock(exception_mutex);
166 if (!first_exception) {
167 first_exception = std::current_exception();
172 std::unique_lock<std::mutex> lock(queue_mutex);
175 condition_producer.notify_all();
180 if (--active_tasks == 0 && stop_pool) {
182 condition_consumer.notify_one();
194 if (--active_tasks == 0 && stop_pool) {
195 condition_consumer.notify_one();
202 workers.reserve(threads_to_use);
203 for (
size_t i = 0; i < threads_to_use; ++i) {
204 workers.emplace_back(worker_logic, std::ref(thread_objects[i]));
214 std::unique_lock<std::mutex> lock(queue_mutex);
215 if (stop_pool)
break;
220 std::unique_lock<std::mutex> lock(queue_mutex);
222 if (stop_pool)
break;
225 tasks.emplace([func, coords_copy](TObject & obj) { func(coords_copy, obj); });
227 condition_producer.notify_one();
233 std::unique_lock<std::mutex> lock(queue_mutex);
235 if (!first_exception) {
236 first_exception = std::current_exception();
239 condition_producer.notify_all();
245 std::unique_lock<std::mutex> lock(queue_mutex);
248 condition_producer.notify_all();
252 std::unique_lock<std::mutex> lock(queue_mutex);
253 condition_consumer.wait(lock, [&] {
return stop_pool && active_tasks == 0; });
257 for (std::thread & worker : workers) {
258 if (worker.joinable()) {
264 if (first_exception) {
265 std::rethrow_exception(first_exception);
Executes a function over all points in an N-dimensional space, optionally in parallel.
std::vector< int > fMinBounds
Minimum bounds for each dimension.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
const std::vector< int > & GetMaxBounds() const
Returns the maximum bounds for each dimension.
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
size_t Dimensions() const
Returns the number of dimensions.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
const std::vector< int > & GetMinBounds() const
Returns the minimum bounds for each dimension.
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Thread-local data object for NDMSPC processing.
size_t GetAssignedIndex() const
Get the assigned index for the thread.