ndmspc  v1.2.0-0.1.rc6
NDimensionalExecutor.h
1 #ifndef N_DIMENSIONAL_EXECUTOR_H
2 #define N_DIMENSIONAL_EXECUTOR_H
3 
4 #include <iomanip>
5 #include <sstream>
6 #include <set>
7 #include <vector>
8 #include <functional>
9 #include <cstddef>
10 #include <thread>
11 #include <mutex>
12 #include <condition_variable>
13 #include <atomic>
14 #include <queue>
15 #include <stdexcept>
16 #include <utility>
17 #include <exception>
18 #include <memory>
19 #include <string>
20 #include <Rtypes.h>
21 #include <THnSparse.h>
22 #include "NLogger.h"
23 #include "NThreadData.h"
24 #include "NTaskStateManager.h"
25 
26 namespace Ndmspc {
27 
30  size_t tasksAcked{0};
31  size_t tasksPending{0};
32  size_t tasksRunning{0};
33  size_t tasksDone{0};
34  size_t activeWorkers{0};
35 };
36 
43  public:
49  NDimensionalExecutor(const std::vector<int> & minBounds, const std::vector<int> & maxBounds);
50 
56  NDimensionalExecutor(THnSparse * hist, bool onlyfilled = false);
57 
59 
60  void SetBounds(const std::vector<int> & minBounds, const std::vector<int> & maxBounds);
61 
66  void Execute(const std::function<void(const std::vector<int> & coords)> & func);
67 
74  template <typename TObject>
75  void ExecuteParallel(const std::function<void(const std::vector<int> & coords, TObject & thread_object)> & func,
76  std::vector<TObject> & thread_objects);
77 
84  size_t ExecuteParallelProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount);
85  void StartProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount,
86  const std::string & tcpBindEndpoint = "", const std::string & jobDir = "",
87  const std::string & treeName = "ngnt", const std::string & macroList = "",
88  const std::string & tmpDir = "", const std::string & tmpResultsDir = "",
89  const std::string & macroParams = "");
90  size_t ExecuteCurrentBoundsProcessIpc(const std::string & definitionName = "",
91  const std::vector<Long64_t> * definitionIds = nullptr,
92  const std::function<void(const ExecutionProgress&)> & progressCallback = nullptr);
93  void FinishProcessIpc(bool abort = false);
94 
95  std::set<size_t> GetRegisteredWorkerIndices() const { return fRegisteredWorkerIndices; }
96 
101  size_t Dimensions() const { return fNumDimensions; }
102 
107  const std::vector<int> & GetMinBounds() const { return fMinBounds; }
108 
113  const std::vector<int> & GetMaxBounds() const { return fMaxBounds; }
114 
115  private:
116  size_t fNumDimensions;
117  std::vector<int> fMinBounds;
118  std::vector<int> fMaxBounds;
119  std::vector<int> fCurrentCoords;
120 
125  bool Increment();
126 
129  bool InitTcpWorker(const std::string & identity);
130 
134  bool HandleBootstrap(const std::string & identity);
135 
138  size_t HandleWorkerFailure(const std::string & failedIdentity,
139  const std::string & failureReason,
140  size_t & outstanding,
141  size_t & acked);
142 
143  struct IpcSession;
144  std::unique_ptr<IpcSession> fIpcSession;
145  std::set<size_t> fRegisteredWorkerIndices;
146 };
147 
148 
149 
150 } // namespace Ndmspc
151 
152 #endif
Executes a function over all points in an N-dimensional space, optionally in parallel.
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
size_t HandleWorkerFailure(const std::string &failedIdentity, const std::string &failureReason, size_t &outstanding, size_t &acked)
Centralized worker failure handling: recovers tasks, removes worker, updates state....
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
const std::vector< int > & GetMaxBounds() const
Returns the maximum bounds for each dimension.
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
size_t Dimensions() const
Returns the number of dimensions.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
const std::vector< int > & GetMinBounds() const
Returns the minimum bounds for each dimension.
Execution progress metrics for IPC-based distributed processing.
size_t tasksRunning
Tasks currently assigned to workers
size_t activeWorkers
Number of workers currently active.
size_t tasksAcked
Tasks completed and ACKed by workers.
size_t tasksDone
Tasks completed locally (state machine)
size_t tasksPending
Tasks waiting to be dispatched.