ndmspc  v1.2.0-0.1.rc6
NDimensionalExecutor.cxx
1 #include <string>
2 #include <algorithm>
3 #include <cctype>
4 #include <chrono>
5 #include <cerrno>
6 #include <csignal>
7 #include <cstdio>
8 #include <queue>
9 #include <set>
10 #include <sstream>
11 #include <unordered_map>
12 #include <sys/wait.h>
13 #include <unistd.h>
14 #include <zmq.h>
15 #include <THnSparse.h>
16 #include <TAxis.h>
17 #include <TROOT.h>
18 #include <TSystem.h>
19 #include "NDimensionalExecutor.h"
20 #include "NDimensionalIpcRunner.h"
21 #include "NGnThreadData.h"
22 #include "NUtils.h"
23 
24 namespace Ndmspc {
25 
26 namespace {
27 volatile sig_atomic_t gIpcSigIntRequested = 0;
28 volatile sig_atomic_t gIpcChildCount = 0;
29 pid_t gIpcChildPids[1024] = {0};
30 
31 void IpcSigIntHandler(int)
32 {
33  gIpcSigIntRequested = 1;
34  const sig_atomic_t count = gIpcChildCount;
35  for (sig_atomic_t i = 0; i < count; ++i) {
36  if (gIpcChildPids[i] > 0) {
37  // User interrupt should not leave orphan IPC workers.
38  kill(gIpcChildPids[i], SIGKILL);
39  }
40  }
41 }
42 
43 void InstallIpcSigIntHandler(const std::vector<pid_t> & childPids, struct sigaction & oldAction, bool & hasOldAction)
44 {
45  gIpcSigIntRequested = 0;
46  gIpcChildCount = 0;
47  const size_t maxChildren = sizeof(gIpcChildPids) / sizeof(gIpcChildPids[0]);
48  const size_t count = std::min(maxChildren, childPids.size());
49  for (size_t i = 0; i < count; ++i) {
50  gIpcChildPids[i] = childPids[i];
51  }
52  for (size_t i = count; i < maxChildren; ++i) {
53  gIpcChildPids[i] = 0;
54  }
55  gIpcChildCount = static_cast<sig_atomic_t>(count);
56 
57  struct sigaction sa;
58  memset(&sa, 0, sizeof(sa));
59  sa.sa_handler = IpcSigIntHandler;
60  sigemptyset(&sa.sa_mask);
61  sa.sa_flags = 0;
62 
63  if (sigaction(SIGINT, &sa, &oldAction) == 0) {
64  hasOldAction = true;
65  }
66 }
67 
68 void RestoreIpcSigIntHandler(const struct sigaction & oldAction, bool hasOldAction)
69 {
70  if (hasOldAction) {
71  sigaction(SIGINT, &oldAction, nullptr);
72  }
73  gIpcChildCount = 0;
74  gIpcSigIntRequested = 0;
75 }
76 } // namespace
77 
79  void * ctx{nullptr};
80  void * router{nullptr};
81  bool isTcp{false};
82  std::string endpointPath;
83  std::string endpoint;
84  std::vector<pid_t> childPids;
85  std::unordered_map<std::string, size_t> identityToWorker;
86  std::vector<std::string> workerIdentityVec; // ordered list for round-robin
87  // TCP late-joiner support: stored so new workers can be initialised mid-run
88  std::string jobDir;
89  std::string treeName;
90  std::vector<NThreadData *> * workerObjects{nullptr};
91  size_t maxWorkers{0};
92  std::string currentDefName;
93  std::vector<Long64_t> currentDefIds;
94  bool hasCurrentDefIds{false};
95  struct sigaction oldSigIntAction{};
96  bool hasOldSigIntAction{false};
97  // Bootstrap configuration sent to workers on first contact
98  std::string macroList; // comma-separated macro paths to load on worker
99  std::string macroParams; // parameter list forwarded to TMacro::Exec on worker
100  std::string tmpDir; // supervisor's NDMSPC_TMP_DIR (fallback for workers)
101  std::string tmpResultsDir; // supervisor's NDMSPC_TMP_RESULTS_DIR
102  size_t bootstrapNextIdx{0}; // auto-assigned index counter for BOOTSTRAP
103  std::unordered_map<std::string, size_t> bootstrapAssignments; // BOOTSTRAP identity -> assigned slot
104  std::vector<std::string> pendingReadyIdentities; // READY messages consumed while waiting for ACK
105  // Task state management: unified handling of pending, running, and done tasks
106  NTaskStateManager taskStateManager;
107  std::unordered_map<std::string, std::set<size_t>> workerTaskHistory; // tasks assigned to each worker in current definition
108  std::set<std::string> earlyDoneWorkers; // workers that sent DONE before FinishProcessIpc started waiting
109  // TCP worker activity tracking for failure detection
110  std::unordered_map<std::string, std::chrono::steady_clock::time_point> workerLastActivity; // identity -> last ACK time
111  std::set<std::string> failedTcpWorkers; // identities of TCP workers that have failed
112 };
113 
114 // --- Private Increment Logic ---
116 {
117  for (int i = fNumDimensions - 1; i >= 0; --i) {
118  fCurrentCoords[i]++;
119  if (fCurrentCoords[i] <= fMaxBounds[i]) {
120  return true;
121  }
122  fCurrentCoords[i] = fMinBounds[i];
123  }
124  return false;
125 }
126 
127 NDimensionalExecutor::NDimensionalExecutor(const std::vector<int> & minBounds, const std::vector<int> & maxBounds)
128  : fMinBounds(minBounds), fMaxBounds(maxBounds)
129 {
133 
134  if (fMinBounds.size() != fMaxBounds.size()) {
135  throw std::invalid_argument("Min and max bounds vectors must have the same size.");
136  }
137  if (fMinBounds.empty()) {
138  throw std::invalid_argument("Bounds vectors cannot be empty.");
139  }
140 
141  fNumDimensions = fMinBounds.size();
142 
143  for (size_t i = 0; i < fNumDimensions; ++i) {
144  if (fMinBounds[i] > fMaxBounds[i]) {
145  throw std::invalid_argument("Min bound (" + std::to_string(fMinBounds[i]) +
146  ") cannot be greater than max bound (" + std::to_string(fMaxBounds[i]) +
147  ") for dimension " + std::to_string(i));
148  }
149  }
150 
153 }
154 
155 void NDimensionalExecutor::SetBounds(const std::vector<int> & minBounds, const std::vector<int> & maxBounds)
156 {
157  fMinBounds = minBounds;
158  fMaxBounds = maxBounds;
159 
160  if (fMinBounds.size() != fMaxBounds.size()) {
161  throw std::invalid_argument("Min and max bounds vectors must have the same size.");
162  }
163  if (fMinBounds.empty()) {
164  throw std::invalid_argument("Bounds vectors cannot be empty.");
165  }
166 
167  fNumDimensions = fMinBounds.size();
168  for (size_t i = 0; i < fNumDimensions; ++i) {
169  if (fMinBounds[i] > fMaxBounds[i]) {
170  throw std::invalid_argument("Min bound (" + std::to_string(fMinBounds[i]) +
171  ") cannot be greater than max bound (" + std::to_string(fMaxBounds[i]) +
172  ") for dimension " + std::to_string(i));
173  }
174  }
175 
178 }
179 
180 NDimensionalExecutor::NDimensionalExecutor(THnSparse * hist, bool onlyfilled)
181 {
185  if (hist == nullptr) {
186  throw std::invalid_argument("THnSparse pointer cannot be null.");
187  }
188 
189  if (onlyfilled) {
190  // Check if the histogram is filled
191  if (hist->GetNbins() <= 0) {
192  throw std::invalid_argument("THnSparse histogram is empty.");
193  }
194 
195  fMinBounds.push_back(0);
196  fMaxBounds.push_back(hist->GetNbins());
197  }
198  else {
199  // loop over all dimensions
200  for (int i = 0; i < hist->GetNdimensions(); ++i) {
201  fMinBounds.push_back(0);
202  fMaxBounds.push_back(hist->GetAxis(i)->GetNbins());
203  }
204  }
205  fNumDimensions = fMinBounds.size();
208 }
209 
210 NDimensionalExecutor::~NDimensionalExecutor() = default;
211 
212 void NDimensionalExecutor::Execute(const std::function<void(const std::vector<int> & coords)> & func)
213 {
217 
218  if (fNumDimensions == 0) {
219  return;
220  }
221  fCurrentCoords = fMinBounds; // Reset state
222  do {
223  func(fCurrentCoords);
224  } while (Increment());
225 }
226 
227 // --- Template Implementation for ExecuteParallel ---
234 template <typename TObject>
236  const std::function<void(const std::vector<int> & coords, TObject & thread_object)> & func,
237  std::vector<TObject> & thread_objects)
238 {
239  if (fNumDimensions == 0) {
240  return;
241  }
242  size_t threads_to_use = thread_objects.size();
243  if (threads_to_use == 0) {
244  throw std::invalid_argument("Thread objects vector cannot be empty.");
245  }
246 
247  std::vector<std::thread> workers;
248  std::queue<std::function<void(TObject &)>> tasks;
249  std::mutex queue_mutex;
250  std::condition_variable condition_producer;
251  std::condition_variable condition_consumer;
252  std::atomic<size_t> active_tasks = 0;
253  std::atomic<bool> stop_pool = false;
254  // Optional: Store first exception encountered in workers
255  std::exception_ptr first_exception = nullptr;
256  std::mutex exception_mutex;
257 
258  // Worker thread logic: fetch and execute tasks, handle exceptions, signal completion.
259  auto worker_logic = [&](TObject & my_object) {
260  NThreadData * md = (NThreadData *)&my_object;
261 
262  std::ostringstream oss;
263  oss << "wk_" << std::setw(6) << std::setfill('0') << md->GetAssignedIndex();
264 
265  NLogger::SetThreadName(oss.str());
266  while (true) {
267  std::function<void(TObject &)> task_payload;
268  bool task_acquired = false; // Track if we actually got a task this iteration
269 
270  try {
271  { // Lock scope for queue access
272  std::unique_lock<std::mutex> lock(queue_mutex);
273  condition_producer.wait(lock, [&] { return stop_pool || !tasks.empty(); });
274 
275  // Check stop condition *after* waking up
276  if (stop_pool && tasks.empty()) {
277  break; // Exit the while loop normally
278  }
279  // If stopping but tasks remain, continue processing them
280 
281  // Only proceed if not stopping or if tasks are still present
282  if (!tasks.empty()) {
283  task_payload = std::move(tasks.front());
284  tasks.pop();
285  task_acquired = true; // We got a task
286  }
287  else {
288  // Spurious wakeup or stop_pool=true with empty queue
289  continue; // Go back to wait
290  }
291  } // Mutex unlocked
292 
293  // Execute the task if we acquired one
294  if (task_acquired) {
295  task_payload(my_object); // Execute task with assigned object
296  }
297  }
298  catch (...) {
299  // --- Exception Handling ---
300  { // Lock to safely store the first exception
301  std::lock_guard<std::mutex> lock(exception_mutex);
302  if (!first_exception) {
303  first_exception = std::current_exception(); // Store it
304  }
305  }
306  // Signal pool to stop immediately on any error
307  {
308  std::unique_lock<std::mutex> lock(queue_mutex);
309  stop_pool = true;
310  }
311  condition_producer.notify_all(); // Wake all threads to check stop flag
312 
313  // *** Crucial Fix: Decrement active_tasks even on exception ***
314  // Check if we actually acquired a task before decrementing
315  if (task_acquired) {
316  if (--active_tasks == 0 && stop_pool) {
317  // Also notify consumer here in case this was the last task
318  condition_consumer.notify_one();
319  }
320  }
321  // Decide whether to exit the worker or try processing remaining tasks
322  // For simplicity, let's exit the worker on error.
323  return; // Exit worker thread immediately on error
324  }
325 
326  // --- Normal Task Completion ---
327  // Decrement active task count *after* successful execution
328  // Check if we actually acquired and processed a task
329  if (task_acquired) {
330  if (--active_tasks == 0 && stop_pool) {
331  condition_consumer.notify_one();
332  }
333  }
334  } // End of while loop
335  }; // End of worker_logic lambda
336 
337  // --- Start Worker Threads ---
338  workers.reserve(threads_to_use);
339  for (size_t i = 0; i < threads_to_use; ++i) {
340  workers.emplace_back(worker_logic, std::ref(thread_objects[i]));
341  }
342 
343  // --- Main Thread: Iterate and Enqueue Tasks ---
344  try {
346  do {
347  // Check if pool was stopped prematurely (e.g., by an exception in a worker)
348  // Lock needed to safely check stop_pool
349  {
350  std::unique_lock<std::mutex> lock(queue_mutex);
351  if (stop_pool) break;
352  }
353 
354  std::vector<int> coords_copy = fCurrentCoords;
355  {
356  std::unique_lock<std::mutex> lock(queue_mutex);
357  // Double check stop_pool after acquiring lock
358  if (stop_pool) break;
359 
360  active_tasks++;
361  tasks.emplace([func, coords_copy](TObject & obj) { func(coords_copy, obj); });
362  }
363  condition_producer.notify_one();
364  } while (Increment());
365  }
366  catch (...) {
367  // Exception during iteration/enqueueing
368  {
369  std::unique_lock<std::mutex> lock(queue_mutex);
370  stop_pool = true; // Signal workers to stop
371  if (!first_exception) { // Store exception if none from workers yet
372  first_exception = std::current_exception();
373  }
374  }
375  condition_producer.notify_all();
376  // Proceed to join threads
377  }
378 
379  // --- Signal Workers to Stop (if not already stopped by error) ---
380  {
381  std::unique_lock<std::mutex> lock(queue_mutex);
382  stop_pool = true;
383  }
384  condition_producer.notify_all();
385 
386  // --- Wait for Tasks to Complete ---
387  {
388  std::unique_lock<std::mutex> lock(queue_mutex);
389  condition_consumer.wait(lock, [&] { return stop_pool && active_tasks == 0; });
390  }
391 
392  // --- Join Worker Threads ---
393  for (std::thread & worker : workers) {
394  if (worker.joinable()) {
395  worker.join();
396  }
397  }
398 
399  // --- Check for and rethrow exception from workers ---
400  if (first_exception) {
401  std::rethrow_exception(first_exception);
402  }
403 }
404 
405 size_t NDimensionalExecutor::ExecuteParallelProcessIpc(std::vector<NThreadData *> & workerObjects,
406  size_t processCount)
407 {
408  StartProcessIpc(workerObjects, processCount);
409  try {
410  size_t acked = ExecuteCurrentBoundsProcessIpc();
411  FinishProcessIpc();
412  return acked;
413  }
414  catch (...) {
415  FinishProcessIpc();
416  throw;
417  }
418 }
419 
420 bool NDimensionalExecutor::InitTcpWorker(const std::string & identity)
421 {
422  // Extract worker index from identity string (e.g. "wk_001")
423  // Derive the prefix by stripping trailing digits from a sample identity
424  const std::string sample = NDimensionalIpcRunner::BuildWorkerIdentity(0);
425  size_t numLen = 0;
426  while (numLen < sample.size() && std::isdigit((unsigned char)sample[sample.size() - 1 - numLen]))
427  ++numLen;
428  const size_t prefixLen = sample.size() - numLen; // e.g. strlen("wk_") == 3
429  if (identity.size() <= prefixLen) return false;
430  size_t workerIdx = 0;
431  try {
432  workerIdx = std::stoul(identity.substr(prefixLen));
433  }
434  catch (...) {
435  NLogWarning("NDimensionalExecutor::InitTcpWorker: cannot parse index from identity '%s'", identity.c_str());
436  return false;
437  }
438  if (workerIdx >= fIpcSession->maxWorkers) {
439  NLogWarning("NDimensionalExecutor::InitTcpWorker: worker index %zu >= maxWorkers %zu, ignoring",
440  workerIdx, fIpcSession->maxWorkers);
441  return false;
442  }
443  if (fIpcSession->identityToWorker.count(identity)) {
444  NLogWarning("NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, ignoring duplicate READY",
445  identity.c_str());
446  return false;
447  }
448 
449  const std::string sessionId = std::to_string(getpid());
450  if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router,
451  {identity, "INIT", std::to_string(workerIdx), sessionId,
452  fIpcSession->jobDir, fIpcSession->treeName,
453  fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
454  NLogError("NDimensionalExecutor::InitTcpWorker: failed to send INIT to '%s'", identity.c_str());
455  return false;
456  }
457 
458  int initTimeoutSec = 30;
459  if (const char * env = gSystem->Getenv("NDMSPC_WORKER_TIMEOUT")) {
460  try {
461  initTimeoutSec = std::max(1, std::stoi(env));
462  }
463  catch (...) {
464  NLogWarning("NDimensionalExecutor::InitTcpWorker: Invalid NDMSPC_WORKER_TIMEOUT='%s', using default=%d", env,
465  initTimeoutSec);
466  }
467  }
468  const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(initTimeoutSec);
469  bool acked = false;
470  while (!acked) {
471  std::vector<std::string> ackFrames;
472  if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, ackFrames)) {
473  if (errno == EAGAIN || errno == EWOULDBLOCK) {
474  if (std::chrono::steady_clock::now() > initDeadline) break;
475  continue;
476  }
477  break;
478  }
479  if (ackFrames.size() >= 2 && ackFrames[1] == "BOOTSTRAP") {
480  HandleBootstrap(ackFrames[0]);
481  continue;
482  }
483  if (ackFrames.size() >= 2 && ackFrames[1] == "READY" && ackFrames[0] != identity) {
484  if (!fIpcSession->identityToWorker.count(ackFrames[0]) &&
485  std::find(fIpcSession->pendingReadyIdentities.begin(), fIpcSession->pendingReadyIdentities.end(),
486  ackFrames[0]) == fIpcSession->pendingReadyIdentities.end()) {
487  fIpcSession->pendingReadyIdentities.push_back(ackFrames[0]);
488  }
489  continue;
490  }
491  if (ackFrames.size() >= 2 && ackFrames[0] == identity && ackFrames[1] == "ACK") {
492  acked = true;
493  }
494  }
495  if (!acked) {
496  NLogError("NDimensionalExecutor::InitTcpWorker: worker '%s' did not ACK INIT", identity.c_str());
497  return false;
498  }
499 
500  // Check for duplicate identity (defensive - shouldn't happen with proper cleanup)
501  if (fIpcSession->identityToWorker.count(identity)) {
502  NLogWarning("NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, replacing", identity.c_str());
503  // Remove from vector if present
504  auto it = std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity);
505  if (it != fIpcSession->workerIdentityVec.end()) {
506  fIpcSession->workerIdentityVec.erase(it);
507  }
508  }
509 
510  fIpcSession->identityToWorker[identity] = workerIdx;
511  fIpcSession->workerIdentityVec.push_back(identity);
512  // Initialize activity tracking for TCP worker
513  fIpcSession->workerLastActivity[identity] = std::chrono::steady_clock::now();
514  NLogInfo("NDimensionalExecutor::InitTcpWorker: worker '%s' (idx=%zu) joined [total: %zu]",
515  identity.c_str(), workerIdx, fIpcSession->workerIdentityVec.size());
516  return true;
517 }
518 
519 bool NDimensionalExecutor::HandleBootstrap(const std::string & identity)
520 {
521  if (!fIpcSession || !fIpcSession->isTcp) return false;
522 
523  // Repeat BOOTSTRAP from the same identity should be idempotent.
524  auto existing = fIpcSession->bootstrapAssignments.find(identity);
525  if (existing != fIpcSession->bootstrapAssignments.end()) {
526  const size_t assignedIdx = existing->second;
527  return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
528  {identity, "CONFIG", std::to_string(assignedIdx),
529  fIpcSession->macroList, fIpcSession->tmpDir,
530  fIpcSession->tmpResultsDir,
531  fIpcSession->macroParams});
532  }
533 
534  if (fIpcSession->bootstrapNextIdx >= fIpcSession->maxWorkers) {
535  NLogWarning("NDimensionalExecutor::HandleBootstrap: rejecting worker '%s' (capacity reached: %zu)",
536  identity.c_str(), fIpcSession->maxWorkers);
537  return NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "REJECT", "capacity"});
538  }
539 
540  const size_t assignedIdx = fIpcSession->bootstrapNextIdx++;
541 
542  fIpcSession->bootstrapAssignments[identity] = assignedIdx;
543 
544  NLogDebug("NDimensionalExecutor::HandleBootstrap: assigning index %zu to worker '%s'", assignedIdx,
545  identity.c_str());
546  return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
547  {identity, "CONFIG", std::to_string(assignedIdx),
548  fIpcSession->macroList, fIpcSession->tmpDir,
549  fIpcSession->tmpResultsDir,
550  fIpcSession->macroParams});
551 }
552 
553 void NDimensionalExecutor::StartProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount,
554  const std::string & tcpBindEndpoint, const std::string & jobDir,
555  const std::string & treeName, const std::string & macroList,
556  const std::string & tmpDir, const std::string & tmpResultsDir,
557  const std::string & macroParams)
558 {
559  if (workerObjects.empty()) {
560  throw std::invalid_argument("Worker objects vector cannot be empty.");
561  }
562  if (fIpcSession) {
563  throw std::runtime_error("IPC session is already active.");
564  }
565 
566  const size_t processesToUse = std::max<size_t>(1, std::min(processCount, workerObjects.size()));
567  NLogInfo("NDimensionalExecutor::StartProcessIpc: requested=%zu, workerObjects=%zu, spawning=%zu", processCount,
568  workerObjects.size(), processesToUse);
569  const auto nowNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
570  std::chrono::high_resolution_clock::now().time_since_epoch())
571  .count();
572  fIpcSession = std::make_unique<IpcSession>();
573 
574  const bool isTcp = !tcpBindEndpoint.empty();
575  fIpcSession->isTcp = isTcp;
576 
577  if (isTcp) {
578  fIpcSession->endpoint = tcpBindEndpoint;
579  fIpcSession->endpointPath.clear();
580  } else {
581  fIpcSession->endpointPath = "/tmp/ndmspc_ipc_" + std::to_string(getpid()) + "_" + std::to_string(nowNs) + ".sock";
582  fIpcSession->endpoint = "ipc://" + fIpcSession->endpointPath;
583  ::unlink(fIpcSession->endpointPath.c_str());
584  }
585 
586  fIpcSession->ctx = zmq_ctx_new();
587  if (!fIpcSession->ctx) {
588  fIpcSession.reset();
589  throw std::runtime_error("Failed to create ZeroMQ context.");
590  }
591 
592  fIpcSession->router = zmq_socket(fIpcSession->ctx, ZMQ_ROUTER);
593  if (!fIpcSession->router) {
594  zmq_ctx_term(fIpcSession->ctx);
595  fIpcSession.reset();
596  throw std::runtime_error("Failed to create ZeroMQ ROUTER socket.");
597  }
598 
599  int timeoutMs = 1000;
600  zmq_setsockopt(fIpcSession->router, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
601 
602  if (zmq_bind(fIpcSession->router, fIpcSession->endpoint.c_str()) != 0) {
603  const std::string err = zmq_strerror(zmq_errno());
604  zmq_close(fIpcSession->router);
605  zmq_ctx_term(fIpcSession->ctx);
606  if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
607  fIpcSession.reset();
608  throw std::runtime_error("Failed to bind endpoint '" + fIpcSession->endpoint + "': " + err);
609  }
610 
611  fIpcSession->identityToWorker.clear();
612  fIpcSession->identityToWorker.reserve(processesToUse);
613  fIpcSession->workerIdentityVec.clear();
614  fIpcSession->pendingReadyIdentities.clear();
615 
616  if (!isTcp) {
617  // IPC/fork mode: pre-seed the map as before so WorkerLoop identities match
618  for (size_t i = 0; i < processesToUse; ++i) {
619  fIpcSession->identityToWorker[NDimensionalIpcRunner::BuildWorkerIdentity(i)] = i;
620  }
621  } else {
622  // TCP mode: store context for late-joining workers
623  fIpcSession->jobDir = jobDir;
624  fIpcSession->treeName = treeName;
625  fIpcSession->workerObjects = &workerObjects;
626  fIpcSession->maxWorkers = processesToUse;
627  fIpcSession->macroList = macroList;
628  fIpcSession->tmpDir = tmpDir;
629  fIpcSession->tmpResultsDir = tmpResultsDir;
630  fIpcSession->macroParams = macroParams;
631  fIpcSession->bootstrapAssignments.clear();
632  }
633 
634  if (!isTcp) {
635  fIpcSession->childPids.assign(processesToUse, -1);
636  for (size_t i = 0; i < processesToUse; ++i) {
637  pid_t pid = fork();
638  if (pid < 0) {
639  NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
640  zmq_close(fIpcSession->router);
641  zmq_ctx_term(fIpcSession->ctx);
642  ::unlink(fIpcSession->endpointPath.c_str());
643  fIpcSession.reset();
644  throw std::runtime_error("Failed to fork worker process.");
645  }
646  if (pid == 0) {
647  zmq_close(fIpcSession->router);
648  zmq_ctx_term(fIpcSession->ctx);
649  const int rc = NDimensionalIpcRunner::WorkerLoop(fIpcSession->endpoint, i, workerObjects[i]);
650  _exit(rc == 0 ? 0 : 1);
651  }
652  fIpcSession->childPids[i] = pid;
653  }
654  }
655 
656  // --- Wait for initial workers ---
657  // IPC (fork): wait for all; TCP: wait until at least one connects or timeout.
658  int readyTimeoutSec = isTcp ? 300 : 30;
659  if (isTcp) {
660  if (const char * env = gSystem->Getenv("NDMSPC_WORKER_TIMEOUT")) {
661  try { readyTimeoutSec = std::stoi(env); } catch (...) {}
662  }
663  NLogInfo("NDimensionalExecutor::StartProcessIpc: waiting up to %d s for TCP workers (max %zu) ...",
664  readyTimeoutSec, processesToUse);
665  }
666  const auto readyDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(readyTimeoutSec);
667 
668  // IPC/fork mode needs the full fixed worker set before dispatch starts.
669  // TCP mode is dynamic: start as soon as the first worker is ready, and let
670  // additional workers join through the late-READY path during execution.
671  const size_t readyTarget = isTcp ? 1 : processesToUse;
672 
673  while (fIpcSession->workerIdentityVec.size() < readyTarget) {
674  if (isTcp && !fIpcSession->pendingReadyIdentities.empty()) {
675  const std::string identity = fIpcSession->pendingReadyIdentities.front();
676  fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
677  InitTcpWorker(identity);
678  continue;
679  }
680 
681  std::vector<std::string> frames;
682  if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
683  if (errno == EAGAIN || errno == EWOULDBLOCK) {
684  if (std::chrono::steady_clock::now() > readyDeadline) {
685  if (!isTcp) {
686  NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
687  zmq_close(fIpcSession->router);
688  zmq_ctx_term(fIpcSession->ctx);
689  ::unlink(fIpcSession->endpointPath.c_str());
690  fIpcSession.reset();
691  throw std::runtime_error("Timeout while waiting for IPC workers to become ready.");
692  }
693  zmq_close(fIpcSession->router);
694  zmq_ctx_term(fIpcSession->ctx);
695  fIpcSession.reset();
696  throw std::runtime_error("Timeout: no TCP workers connected.");
697  }
698  continue;
699  }
700  if (!isTcp) NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
701  zmq_close(fIpcSession->router);
702  zmq_ctx_term(fIpcSession->ctx);
703  if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
704  fIpcSession.reset();
705  throw std::runtime_error("Failed to receive READY message from worker.");
706  }
707  if (frames.size() < 2) continue;
708  const std::string & identity = frames[0];
709  const std::string & cmd = frames[1];
710  if (isTcp && cmd == "BOOTSTRAP") {
711  HandleBootstrap(identity);
712  continue;
713  }
714  if (cmd != "READY") continue;
715 
716  if (isTcp) {
717  InitTcpWorker(identity);
718  } else {
719  // IPC/fork: just register (already pre-seeded)
720  if (fIpcSession->identityToWorker.count(identity)) {
721  if (std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity)
722  == fIpcSession->workerIdentityVec.end()) {
723  fIpcSession->workerIdentityVec.push_back(identity);
724  NLogInfo("NDimensionalExecutor::StartProcessIpc: worker '%s' ready (%zu / %zu)", identity.c_str(),
725  fIpcSession->workerIdentityVec.size(), processesToUse);
726  }
727  }
728  }
729  }
730 
731  // In TCP mode the startup loop exits as soon as the first worker is ready.
732  // Any further READY messages that arrived (and were buffered in
733  // pendingReadyIdentities during InitTcpWorker's ACK wait) must be drained
734  // here so all already-connected workers are fully initialised before the
735  // dispatch loop starts — otherwise their tasks would never be scheduled.
736  if (isTcp) {
737  while (!fIpcSession->pendingReadyIdentities.empty()) {
738  const std::string id = fIpcSession->pendingReadyIdentities.front();
739  fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
740  InitTcpWorker(id);
741  }
742  }
743 
744  if (!isTcp) {
745  InstallIpcSigIntHandler(fIpcSession->childPids, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
746  } else {
747  // TCP mode: install handler with no child PIDs — just sets gIpcSigIntRequested
748  // so the dispatch loop can break cleanly and FinishProcessIpc sends STOP to workers.
749  InstallIpcSigIntHandler({}, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
750  }
751 }
752 
753 // Centralized worker failure cleanup: recovers tasks, removes worker from tracking, logs, updates progress.
754 // This eliminates duplication across TCP send failure, TCP timeout, and IPC crash handlers.
755 size_t NDimensionalExecutor::HandleWorkerFailure(const std::string & failedIdentity,
756  const std::string & failureReason,
757  size_t & outstanding,
758  size_t & acked)
759 {
760  if (!fIpcSession) {
761  return 0;
762  }
763 
764  size_t redistributedCount = 0;
765  size_t replayedDoneCount = 0;
766  size_t replayedLiveCount = 0;
767 
768  auto historyIt = fIpcSession->workerTaskHistory.find(failedIdentity);
769  if (historyIt != fIpcSession->workerTaskHistory.end()) {
770  for (const size_t taskId : historyIt->second) {
771  const bool wasDone = fIpcSession->taskStateManager.IsDone(taskId);
772  if (!fIpcSession->taskStateManager.RequeueTask(taskId)) {
773  continue;
774  }
775  ++redistributedCount;
776  if (wasDone) {
777  ++replayedDoneCount;
778  } else {
779  ++replayedLiveCount;
780  }
781  }
782  fIpcSession->workerTaskHistory.erase(historyIt);
783  } else {
784  const auto recovered = fIpcSession->taskStateManager.RecoverWorkerTasks(failedIdentity);
785  redistributedCount = recovered.size();
786  replayedLiveCount = redistributedCount;
787  }
788 
789  if (replayedLiveCount > 0) {
790  const size_t dec = std::min(outstanding, replayedLiveCount);
791  outstanding -= dec;
792  }
793  if (replayedDoneCount > 0) {
794  const size_t dec = std::min(acked, replayedDoneCount);
795  acked -= dec;
796  }
797 
798  // Remove worker from all tracking structures
799  auto identityIt = std::find(fIpcSession->workerIdentityVec.begin(),
800  fIpcSession->workerIdentityVec.end(),
801  failedIdentity);
802  if (identityIt != fIpcSession->workerIdentityVec.end()) {
803  fIpcSession->workerIdentityVec.erase(identityIt);
804  }
805  fIpcSession->identityToWorker.erase(failedIdentity);
806  fIpcSession->workerLastActivity.erase(failedIdentity);
807  fIpcSession->failedTcpWorkers.erase(failedIdentity);
808 
809  // Log worker removal
810  if (redistributedCount > 0) {
811  if (failureReason == "send_failure") {
812  NLogWarning("TCP worker '%s' removed due to send failure. Redistributing %zu task(s). Remaining workers: %zu",
813  failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
814  } else if (failureReason == "timeout") {
815  NLogWarning("TCP worker '%s' inactive/disconnected (%zu tasks pending). Redistributing to remaining %zu workers.",
816  failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
817  } else if (failureReason == "crash") {
818  NLogWarning("Worker process '%s' exited unexpectedly. Redistributing %zu task(s) to remaining %zu workers.",
819  failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
820  } else if (failureReason == "interrupted") {
821  NLogWarning("Worker '%s' interrupted. Replaying %zu task(s) on remaining %zu worker(s).",
822  failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
823  } else {
824  NLogWarning("Worker '%s' failed (%s). Redistributing %zu task(s). Remaining workers: %zu",
825  failedIdentity.c_str(), failureReason.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
826  }
827  }
828 
829  return redistributedCount;
830 }
831 
832 size_t NDimensionalExecutor::ExecuteCurrentBoundsProcessIpc(const std::string & definitionName,
833  const std::vector<Long64_t> * definitionIds,
834  const std::function<void(const ExecutionProgress&)> & progressCallback)
835 {
836  if (!fIpcSession) {
837  throw std::runtime_error("IPC session is not active.");
838  }
839  if (fNumDimensions == 0) {
840  return 0;
841  }
842 
843  fIpcSession->taskStateManager.Clear();
844  fIpcSession->workerTaskHistory.clear();
845 
846  // Save current definition so late-joining workers can catch up
847  fIpcSession->currentDefName = definitionName;
848  fIpcSession->currentDefIds = definitionIds ? *definitionIds : std::vector<Long64_t>{};
849  fIpcSession->hasCurrentDefIds = (definitionIds != nullptr);
850 
851  if (!definitionName.empty()) {
852  std::vector<std::string> failedWorkers;
853  for (const auto & identity : fIpcSession->workerIdentityVec) {
854  if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "SETDEF", definitionName})) {
855  if (fIpcSession->isTcp) {
856  NLogWarning("Failed to send SETDEF to TCP worker '%s', marking as failed", identity.c_str());
857  failedWorkers.push_back(identity);
858  continue; // Skip SETIDS for this worker and continue with others
859  } else {
860  throw std::runtime_error("Failed to send IPC SETDEF message to worker '" + identity + "'.");
861  }
862  }
863  if (definitionIds != nullptr) {
864  if (!NDimensionalIpcRunner::SendFrames(
865  fIpcSession->router, {identity, "SETIDS", NDimensionalIpcRunner::SerializeIds(*definitionIds)})) {
866  if (fIpcSession->isTcp) {
867  NLogWarning("Failed to send SETIDS to TCP worker '%s', marking as failed", identity.c_str());
868  failedWorkers.push_back(identity);
869  continue;
870  } else {
871  throw std::runtime_error("Failed to send IPC SETIDS message to worker '" + identity + "'.");
872  }
873  }
874  }
875  }
876 
877  // Mark failed TCP workers for removal
878  if (fIpcSession->isTcp && !failedWorkers.empty()) {
879  for (const auto & identity : failedWorkers) {
880  fIpcSession->failedTcpWorkers.insert(identity);
881  }
882  NLogWarning("Marked %zu TCP worker(s) as failed during SETDEF/SETIDS", failedWorkers.size());
883  }
884  }
885 
886  size_t ipcBatchSize = 1;
887  if (const char * envBatchSize = gSystem->Getenv("NDMSPC_IPC_BATCH_SIZE")) {
888  try {
889  ipcBatchSize = std::max<size_t>(1, static_cast<size_t>(std::stoll(envBatchSize)));
890  }
891  catch (...) {
892  NLogWarning("NGnTree::Process: Invalid NDMSPC_IPC_BATCH_SIZE='%s', using default=%zu", envBatchSize,
893  ipcBatchSize);
894  }
895  }
896 
897  size_t nextTaskId = 0;
898  size_t dispatchMessageId = 0;
899  size_t outstanding = 0;
900  size_t outstandingMessages = 0;
901  size_t acked = 0;
902  size_t nextSchedulerLogAck = 200;
903  std::string firstError;
904  // Late-joining TCP workers: identity → assigned worker index.
905  // INIT is sent immediately on READY but the ACK is handled asynchronously
906  // in the main receive loop so we never block on socket reads mid-dispatch.
907  std::unordered_map<std::string, size_t> pendingInitWorkers;
908 
909  int stallTimeoutSec = 120;
910  if (const char * envStallTimeout = gSystem->Getenv("NDMSPC_IPC_STALL_TIMEOUT")) {
911  try {
912  stallTimeoutSec = std::max(5, std::stoi(envStallTimeout));
913  }
914  catch (...) {
915  NLogWarning("NGnTree::Process: Invalid NDMSPC_IPC_STALL_TIMEOUT='%s', using default=%d", envStallTimeout,
916  stallTimeoutSec);
917  }
918  }
919  auto lastProgress = std::chrono::steady_clock::now();
920 
922  bool hasMore = true;
923 
924  // maxInFlightMessages is recalculated dynamically as workers join
925  size_t totalTasks = 1;
926  for (size_t i = 0; i < fNumDimensions; ++i) {
927  totalTasks *= static_cast<size_t>(fMaxBounds[i] - fMinBounds[i] + 1);
928  }
929  auto isUserInterrupted = []() {
930  if (gIpcSigIntRequested != 0) return true;
931  return (gROOT && gROOT->IsInterrupted());
932  };
933  // Helper: send SETDEF/SETIDS catchup to a newly-joined worker
934  auto sendCatchup = [&](const std::string & identity) {
935  if (!fIpcSession->currentDefName.empty()) {
936  NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "SETDEF", fIpcSession->currentDefName});
937  if (fIpcSession->hasCurrentDefIds) {
938  NDimensionalIpcRunner::SendFrames(
939  fIpcSession->router, {identity, "SETIDS", NDimensionalIpcRunner::SerializeIds(fIpcSession->currentDefIds)});
940  }
941  }
942  };
943 
944  while ((hasMore || outstanding > 0 || fIpcSession->taskStateManager.HasPending()) && firstError.empty()) {
945  // STATE MACHINE FOR TASK DISTRIBUTION:
946  // pending → dispatch → running → ACK → done
947  //
948  // Key counters:
949  // - hasMore: More tasks available from coordinate iterator?
950  // - outstanding: Number of tasks sent but not yet ACKed
951  // - pending: Tasks queued, not yet assigned to workers
952  // - running: Tasks assigned to workers, awaiting ACK
953  // - done: Tasks fully processed and ACKed
954  // - acked: Cumulative count of tasks ACKed (used for progress callback)
955  //
956  // SAFE EXIT CONDITIONS:
957  // 1. hasMore=false && outstanding=0 && pending=0 → All work complete (SUCCESS)
958  // 2. No workers remain && (outstanding>0 || pending>0) → FAIL (work lost)
959  // 3. User interrupt signal (Ctrl+C) → FAIL with cleanup
960  // 4. Stall timeout (no progress for N seconds) → FAIL (deadlock)
961  //
962  // WORKER FAILURE RECOVERY:
963  // On TCP timeout, IPC crash, or send failure:
964  // - Call RecoverWorkerTasks(worker) → Returns pending tasks, requeues running tasks
965  // - Tasks become pending again and are re-dispatched to remaining workers
966  // - No loss of work, but may increase total execution time
967  // - No retry loop to avoid infinite loops with persistent failures
968 
969  if (isUserInterrupted()) {
970  firstError = "Interrupted by user";
971  break;
972  }
973 
974  // Check if all workers have failed - exit early to avoid infinite loop
975  if (fIpcSession->workerIdentityVec.empty() && (outstanding > 0 || fIpcSession->taskStateManager.HasPending() || hasMore)) {
976  if (fIpcSession->isTcp) {
977  firstError = "No workers available. All TCP workers have disconnected/failed.";
978  } else {
979  firstError = "No workers available. All worker processes have exited/failed.";
980  }
981  break;
982  }
983 
984  // Allow multiple batches to be in flight for better parallelism
985  // Each worker can have up to this many batches pending
986  const size_t maxInFlightMessages = std::max<size_t>(4, fIpcSession->workerIdentityVec.size());
987 
988  while ((hasMore || fIpcSession->taskStateManager.HasPending()) && outstandingMessages < maxInFlightMessages && firstError.empty()) {
989  if (fIpcSession->workerIdentityVec.empty()) break; // no workers yet — wait
990 
991  // Skip workers that have been marked as failed
992  size_t workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
993  std::string identity = fIpcSession->workerIdentityVec[workerSlot];
994 
995  // In TCP mode, skip failed workers and find next available worker
996  if (fIpcSession->isTcp) {
997  size_t attempts = 0;
998  while (fIpcSession->failedTcpWorkers.count(identity) && attempts < fIpcSession->workerIdentityVec.size()) {
999  ++dispatchMessageId;
1000  ++attempts;
1001  workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
1002  identity = fIpcSession->workerIdentityVec[workerSlot];
1003  }
1004  // If all workers are failed, break to trigger cleanup
1005  if (attempts >= fIpcSession->workerIdentityVec.size()) {
1006  break;
1007  }
1008  }
1009 
1010  std::vector<std::pair<size_t, std::vector<int>>> batchTasks;
1011  const size_t nw = fIpcSession->workerIdentityVec.size();
1012  const size_t remainingTasks = (nextTaskId < totalTasks) ? (totalTasks - nextTaskId) : 0;
1013  const size_t adaptiveBatchSize = std::max<size_t>(
1014  1, std::min(ipcBatchSize, std::max<size_t>(1, (remainingTasks + nw - 1) / nw)));
1015  batchTasks.reserve(adaptiveBatchSize);
1016 
1017  // First, dispatch redistributed (pending) tasks from failed workers
1018  // These were added back to pending state by RecoverWorkerTasks or MarkFailed
1019  size_t reprocessedCount = 0;
1020  size_t redistPerBatch = 1;
1021  if (adaptiveBatchSize > 1 && fIpcSession->workerIdentityVec.size() > 0) {
1022  redistPerBatch = std::max<size_t>(1, adaptiveBatchSize / fIpcSession->workerIdentityVec.size());
1023  }
1024  size_t redistAdded = 0;
1025  while (fIpcSession->taskStateManager.HasPending() && outstanding < maxInFlightMessages * ipcBatchSize &&
1026  batchTasks.size() < adaptiveBatchSize && redistAdded < redistPerBatch) {
1027  size_t taskId = 0;
1028  std::vector<int> coords;
1029  if (!fIpcSession->taskStateManager.ClaimNextPendingForWorker(identity, taskId, coords)) {
1030  break;
1031  }
1032  batchTasks.emplace_back(taskId, coords);
1033  fIpcSession->workerTaskHistory[identity].insert(taskId);
1034  ++redistAdded;
1035  ++outstanding;
1036 
1037  if (isUserInterrupted()) {
1038  firstError = "Interrupted by user";
1039  break;
1040  }
1041  }
1042 
1043  if (reprocessedCount > 0) {
1044  NLogDebug("Redistributing %zu previously-completed tasks (acked counter decremented)", reprocessedCount);
1045  }
1046 
1047  // Then, dispatch new tasks if space available
1048  while (hasMore && outstanding < maxInFlightMessages * ipcBatchSize && batchTasks.size() < adaptiveBatchSize) {
1049  fIpcSession->taskStateManager.AddPending(nextTaskId, fCurrentCoords);
1050  size_t taskId = 0;
1051  std::vector<int> payload;
1052  if (!fIpcSession->taskStateManager.ClaimNextPendingForWorker(identity, taskId, payload)) {
1053  firstError = "Failed to claim pending task for worker dispatch.";
1054  break;
1055  }
1056  batchTasks.emplace_back(taskId, payload);
1057  fIpcSession->workerTaskHistory[identity].insert(taskId);
1058  ++nextTaskId;
1059  ++outstanding;
1060 
1061  if (!Increment()) {
1062  hasMore = false;
1063  }
1064 
1065  if (isUserInterrupted()) {
1066  firstError = "Interrupted by user";
1067  break;
1068  }
1069  }
1070 
1071  if (!firstError.empty()) {
1072  break;
1073  }
1074 
1075  if (batchTasks.empty()) {
1076  continue;
1077  }
1078 
1079  // Log assigned task coordinates to supervisor console for debugging
1080  for (const auto & task : batchTasks) {
1081  const std::string coordsStr = NDimensionalIpcRunner::SerializeCoords(task.second);
1082  NLogInfo("NDimensionalExecutor: Assign task %zu coords=%s -> worker '%s'", task.first, coordsStr.c_str(), identity.c_str());
1083  }
1084 
1085  // Initialize/update activity tracking for TCP workers
1086  if (fIpcSession->isTcp) {
1087  fIpcSession->workerLastActivity[identity] = std::chrono::steady_clock::now();
1088  }
1089 
1090  if (batchTasks.size() == 1) {
1091  const std::string taskId = std::to_string(batchTasks[0].first);
1092  const std::string coords = NDimensionalIpcRunner::SerializeCoords(batchTasks[0].second);
1093  if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "TASK", taskId, coords})) {
1094  if (fIpcSession->isTcp) {
1095  // TCP worker likely disconnected - mark for redistribution
1096  NLogWarning("Failed to send TASK to TCP worker '%s', marking as failed", identity.c_str());
1097 
1098  fIpcSession->failedTcpWorkers.insert(identity);
1099  // Put task back in redistribution queue (mark as failed to return to pending)
1100  for (const auto & task : batchTasks) {
1101  fIpcSession->taskStateManager.MarkFailed(task.first);
1102  }
1103  break; // Break inner loop to skip this worker and retry on next iteration
1104  } else {
1105  firstError = "Failed to send IPC TASK message to worker '" + identity + "'.";
1106  break;
1107  }
1108  }
1109  }
1110  else {
1111  std::ostringstream payload;
1112  for (size_t i = 0; i < batchTasks.size(); ++i) {
1113  if (i != 0) payload << ';';
1114  payload << batchTasks[i].first << ':' << NDimensionalIpcRunner::SerializeCoords(batchTasks[i].second);
1115  }
1116  if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "TASKB", payload.str()})) {
1117  if (fIpcSession->isTcp) {
1118  // TCP worker likely disconnected - mark for redistribution
1119  NLogWarning("Failed to send TASKB to TCP worker '%s', marking as failed", identity.c_str());
1120 
1121  fIpcSession->failedTcpWorkers.insert(identity);
1122  // Put tasks back in redistribution queue (mark as failed to return to pending)
1123  for (const auto & task : batchTasks) {
1124  fIpcSession->taskStateManager.MarkFailed(task.first);
1125  }
1126  break; // Break inner loop to skip this worker and retry on next iteration
1127  } else {
1128  firstError = "Failed to send IPC TASKB message to worker '" + identity + "'.";
1129  break;
1130  }
1131  }
1132  }
1133  ++dispatchMessageId;
1134  ++outstandingMessages;
1135  }
1136 
1137  // Clean up any TCP workers that failed during send attempts
1138  if (fIpcSession->isTcp && !fIpcSession->failedTcpWorkers.empty()) {
1139  std::vector<std::string> workersToRemove;
1140  for (const auto & identity : fIpcSession->workerIdentityVec) {
1141  if (fIpcSession->failedTcpWorkers.count(identity)) {
1142  workersToRemove.push_back(identity);
1143  }
1144  }
1145 
1146  for (const auto & failedIdentity : workersToRemove) {
1147  HandleWorkerFailure(failedIdentity, "send_failure", outstanding, acked);
1148 
1149  // Update progress bar to reflect worker count change
1150  if (progressCallback) {
1151  ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1152  fIpcSession->taskStateManager.RunningCount(),
1153  fIpcSession->taskStateManager.DoneCount(),
1154  fIpcSession->workerIdentityVec.size()};
1155  progressCallback(progress);
1156  }
1157  }
1158 
1159  // If no workers remain, fail
1160  if (fIpcSession->workerIdentityVec.empty()) {
1161  firstError = "No workers available. All TCP workers have disconnected/failed.";
1162 
1163  }
1164  }
1165 
1166  if (outstanding == 0 && fIpcSession->workerIdentityVec.empty()) continue;
1167  if (outstanding == 0 && !hasMore && !fIpcSession->taskStateManager.HasPending()) continue;
1168 
1169  std::vector<std::string> frames;
1170  if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
1171  if (errno == EINTR || isUserInterrupted()) {
1172  firstError = "Interrupted by user";
1173  break;
1174  }
1175  if (errno != EAGAIN && errno != EWOULDBLOCK) {
1176  // In TCP mode, receive errors can occur when workers disconnect
1177  // Let the activity timeout detection handle this gracefully
1178  if (!fIpcSession->isTcp) {
1179  firstError = "Failed to receive IPC ACK/ERR from worker.";
1180  break;
1181  }
1182  // TCP mode: treat as timeout and continue to worker failure detection
1183  }
1184 
1185  // Check for worker failures and redistribute their tasks
1186  // For TCP mode: check for inactive workers (no ACKs received)
1187  if (fIpcSession->isTcp && outstanding > 0) {
1188  const auto now = std::chrono::steady_clock::now();
1189  int tcpWorkerTimeoutSec = 30; // Default timeout for TCP worker inactivity
1190  if (const char * envTcpTimeout = gSystem->Getenv("NDMSPC_TCP_WORKER_TIMEOUT")) {
1191  try {
1192  tcpWorkerTimeoutSec = std::max(10, std::stoi(envTcpTimeout));
1193  } catch (...) {}
1194  }
1195 
1196  std::vector<std::string> inactiveWorkers;
1197  for (const auto & identity : fIpcSession->workerIdentityVec) {
1198  auto activityIt = fIpcSession->workerLastActivity.find(identity);
1199  if (activityIt != fIpcSession->workerLastActivity.end()) {
1200  auto inactiveSecs = std::chrono::duration_cast<std::chrono::seconds>(now - activityIt->second).count();
1201  if (inactiveSecs >= tcpWorkerTimeoutSec) {
1202  // Check if this worker has pending tasks - if so, it should have responded by now
1203  bool hasPendingTasks = !fIpcSession->taskStateManager.GetWorkerTasks(identity).empty();
1204 
1205  // Mark as inactive if:
1206  // 1. It has pending tasks (should have responded by now), OR
1207  // 2. It's been idle for more than 2x the timeout (likely disconnected)
1208  if (hasPendingTasks || inactiveSecs >= tcpWorkerTimeoutSec * 2) {
1209  inactiveWorkers.push_back(identity);
1210  }
1211  }
1212  }
1213  }
1214 
1215  // Redistribute tasks from inactive TCP workers
1216  for (const auto & failedIdentity : inactiveWorkers) {
1217  if (fIpcSession->failedTcpWorkers.count(failedIdentity)) continue; // already handled
1218  fIpcSession->failedTcpWorkers.insert(failedIdentity);
1219 
1220  HandleWorkerFailure(failedIdentity, "timeout", outstanding, acked);
1221 
1222  // Reset progress timer since we're actively handling worker failure
1223  lastProgress = std::chrono::steady_clock::now();
1224 
1225  // Update progress bar to reflect worker count change
1226  if (progressCallback) {
1227  ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1228  fIpcSession->taskStateManager.RunningCount(),
1229  fIpcSession->taskStateManager.DoneCount(),
1230  fIpcSession->workerIdentityVec.size()};
1231  progressCallback(progress);
1232  }
1233 
1234  // If no workers remain, fail
1235  if (fIpcSession->workerIdentityVec.empty()) {
1236  firstError = "All TCP workers have disconnected/failed. No workers available to continue processing.";
1237 
1238  break;
1239  }
1240  }
1241  }
1242 
1243  // For IPC mode: check forked child processes
1244  for (size_t i = 0; i < fIpcSession->childPids.size(); ++i) {
1245  int status = 0;
1246  pid_t rc = waitpid(fIpcSession->childPids[i], &status, WNOHANG);
1247  if (rc == fIpcSession->childPids[i]) {
1248  // Worker process exited - redistribute its pending tasks
1249  std::string failedIdentity = NDimensionalIpcRunner::BuildWorkerIdentity(i);
1250 
1251  HandleWorkerFailure(failedIdentity, "crash", outstanding, acked);
1252 
1253  // Reset progress timer since we're actively redistributing
1254  lastProgress = std::chrono::steady_clock::now();
1255 
1256  // Update progress bar to reflect worker count change
1257  if (progressCallback) {
1258  ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1259  fIpcSession->taskStateManager.RunningCount(),
1260  fIpcSession->taskStateManager.DoneCount(),
1261  fIpcSession->workerIdentityVec.size()};
1262  progressCallback(progress);
1263  }
1264 
1265  // If no workers remain, fail
1266  if (fIpcSession->workerIdentityVec.empty()) {
1267  firstError = "No workers available. All worker processes have exited/failed.";
1268 
1269  break;
1270  }
1271  }
1272  }
1273  if (firstError.empty() && outstanding > 0 && stallTimeoutSec > 0) {
1274  const auto now = std::chrono::steady_clock::now();
1275  const auto stallSecs = std::chrono::duration_cast<std::chrono::seconds>(now - lastProgress).count();
1276  if (stallSecs >= stallTimeoutSec) {
1277  const size_t activeWorkers = fIpcSession->workerIdentityVec.size();
1278  if (activeWorkers == 0) {
1279  firstError = "No workers available. All workers have disconnected/failed with " +
1280  std::to_string(outstanding) + " pending tasks remaining.";
1281  } else {
1282  firstError = "No IPC/TCP ACK progress for " + std::to_string(stallSecs) + "s with " +
1283  std::to_string(outstanding) + " pending tasks (active workers: " +
1284  std::to_string(activeWorkers) + ").";
1285  }
1286  }
1287  }
1288  continue;
1289  }
1290 
1291  if (frames.size() < 2) continue;
1292 
1293  // Handle late-joining TCP worker: send INIT immediately but do NOT block
1294  // waiting for the ACK — the ACK is handled below as a 2-frame message so
1295  // task ACKs from active workers are never dropped.
1296  if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] == "READY") {
1297  const std::string & lateId = frames[0];
1298  if (!fIpcSession->identityToWorker.count(lateId) && !pendingInitWorkers.count(lateId)) {
1299  const std::string prefix = "wk_";
1300  const size_t prefixLen = prefix.size();
1301  if (lateId.size() > prefixLen && lateId.substr(0, prefixLen) == prefix) {
1302  size_t workerIdx = std::numeric_limits<size_t>::max();
1303  try { workerIdx = std::stoul(lateId.substr(prefixLen)); } catch (...) {}
1304  if (workerIdx < fIpcSession->maxWorkers) {
1305  const std::string sessionId = std::to_string(getpid());
1306  if (NDimensionalIpcRunner::SendFrames(fIpcSession->router,
1307  {lateId, "INIT", std::to_string(workerIdx), sessionId,
1308  fIpcSession->jobDir, fIpcSession->treeName,
1309  fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
1310  pendingInitWorkers[lateId] = workerIdx;
1311  NLogDebug("NDimensionalExecutor: late worker '%s' sent INIT, awaiting ACK", lateId.c_str());
1312  }
1313  }
1314  }
1315  }
1316  lastProgress = std::chrono::steady_clock::now();
1317  continue;
1318  }
1319 
1320  // Handle INIT ACK from a late-joining worker (2-frame: identity + "ACK").
1321  if (fIpcSession->isTcp && frames.size() == 2 && frames[1] == "ACK") {
1322  auto pit = pendingInitWorkers.find(frames[0]);
1323  if (pit != pendingInitWorkers.end()) {
1324  const std::string & id = pit->first;
1325  const size_t idx = pit->second;
1326 
1327  // Check for duplicate identity (defensive - shouldn't happen with proper cleanup)
1328  if (fIpcSession->identityToWorker.count(id)) {
1329  NLogWarning("NDimensionalExecutor: late worker '%s' already registered, replacing", id.c_str());
1330  // Remove from vector if present
1331  auto it = std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), id);
1332  if (it != fIpcSession->workerIdentityVec.end()) {
1333  fIpcSession->workerIdentityVec.erase(it);
1334  }
1335  }
1336 
1337  fIpcSession->identityToWorker[id] = idx;
1338  fIpcSession->workerIdentityVec.push_back(id);
1339  // Initialize activity tracking for this new TCP worker
1340  fIpcSession->workerLastActivity[id] = std::chrono::steady_clock::now();
1341  NLogInfo("NDimensionalExecutor: late worker '%s' (idx=%zu) joined [total: %zu]",
1342  id.c_str(), idx, fIpcSession->workerIdentityVec.size());
1343  // Log in-flight task distribution across all workers so the startup imbalance is visible
1344  for (const auto & wid : fIpcSession->workerIdentityVec) {
1345  const size_t inFlight = fIpcSession->taskStateManager.GetWorkerTasks(wid).size();
1346  NLogInfo("NDimensionalExecutor: in-flight distribution: worker '%s' has %zu pending task(s)", wid.c_str(), inFlight);
1347  }
1348  sendCatchup(id);
1349  pendingInitWorkers.erase(pit);
1350  lastProgress = std::chrono::steady_clock::now();
1351  continue;
1352  }
1353  // Not a pending-init ACK — fall through to the malformed-message guard.
1354  }
1355 
1356  // Handle bootstrapping worker requesting config
1357  if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] == "BOOTSTRAP") {
1358  HandleBootstrap(frames[0]);
1359  lastProgress = std::chrono::steady_clock::now();
1360  continue;
1361  }
1362 
1363  // Handle worker shutdown notification (e.g., Ctrl+C)
1364  if (frames.size() >= 2 && frames[1] == "SHUTDOWN") {
1365  const std::string & workerIdentity = frames[0];
1366  const std::string reason = (frames.size() >= 3) ? frames[2] : "unknown";
1367  const std::string tasksCompleted = (frames.size() >= 4) ? frames[3] : "?";
1368 
1369  NLogWarning("Worker '%s' reported shutdown: %s (completed %s tasks)", workerIdentity.c_str(), reason.c_str(), tasksCompleted.c_str());
1370 
1371  if (fIpcSession->isTcp) {
1372  fIpcSession->failedTcpWorkers.insert(workerIdentity);
1373  }
1374 
1375  HandleWorkerFailure(workerIdentity, "interrupted", outstanding, acked);
1376 
1377  if (progressCallback) {
1378  ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1379  fIpcSession->taskStateManager.RunningCount(),
1380  fIpcSession->taskStateManager.DoneCount(),
1381  fIpcSession->workerIdentityVec.size()};
1382  progressCallback(progress);
1383  }
1384 
1385  // If no workers remain, fail
1386  if (fIpcSession->workerIdentityVec.empty()) {
1387  if (fIpcSession->isTcp) {
1388  firstError = "No workers available. All TCP workers have shut down.";
1389  } else {
1390  firstError = "No workers available. All worker processes have shut down.";
1391  }
1392 
1393  break;
1394  }
1395 
1396  lastProgress = std::chrono::steady_clock::now();
1397  continue;
1398  }
1399 
1400  if (frames.size() == 2 && frames[1] == "DONE") {
1401  const std::string & workerIdentity = frames[0];
1402  if (fIpcSession->identityToWorker.count(workerIdentity)) {
1403  fIpcSession->earlyDoneWorkers.insert(workerIdentity);
1404  NLogDebug("NDimensionalExecutor::IPC: Worker '%s' sent DONE before FinishProcessIpc; deferring",
1405  workerIdentity.c_str());
1406  } else {
1407  NLogWarning("NDimensionalExecutor::IPC: ignoring DONE from unknown worker '%s'", workerIdentity.c_str());
1408  }
1409  lastProgress = std::chrono::steady_clock::now();
1410  continue;
1411  }
1412 
1413  if (frames.size() < 3) {
1414  // A 2-frame ACK that isn't a pending-init reply is unexpected.
1415  if (frames.size() == 2 && frames[1] == "ACK")
1416  NLogWarning("NDimensionalExecutor: unexpected 2-frame ACK from '%s', ignoring", frames[0].c_str());
1417  else
1418  firstError = "Malformed IPC message received from worker.";
1419  continue;
1420  }
1421 
1422  const std::string & cmd = frames[1];
1423  if (cmd == "ACK") {
1424  const std::string & workerIdentity = frames[0];
1425  size_t taskId = 0;
1426  try {
1427  taskId = static_cast<size_t>(std::stoull(frames[2]));
1428  }
1429  catch (...) {
1430  firstError = "Malformed IPC task id received from worker.";
1431  break;
1432  }
1433 
1434  // Mark task as done using TaskStateManager
1435  if (!fIpcSession->taskStateManager.MarkDone(taskId)) {
1436  firstError = "Received ACK for unknown or already-done task " + std::to_string(taskId) + ".";
1437  break;
1438  }
1439 
1440  // Update TCP worker activity tracking
1441  if (fIpcSession->isTcp) {
1442  fIpcSession->workerLastActivity[workerIdentity] = std::chrono::steady_clock::now();
1443  }
1444 
1445  if (outstanding == 0) {
1446  firstError = "IPC outstanding counter underflow while processing ACK.";
1447  break;
1448  }
1449  if (outstandingMessages == 0) {
1450  firstError = "IPC message outstanding counter underflow while processing ACK.";
1451  break;
1452  }
1453  --outstanding;
1454  --outstandingMessages;
1455  ++acked;
1456  lastProgress = std::chrono::steady_clock::now();
1457  const size_t activeWorkersNow = fIpcSession->workerIdentityVec.size();
1458  if (progressCallback) {
1459  ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1460  fIpcSession->taskStateManager.RunningCount(),
1461  fIpcSession->taskStateManager.DoneCount(),
1462  activeWorkersNow};
1463  progressCallback(progress);
1464  }
1465  if (acked >= nextSchedulerLogAck) {
1466  NLogDebug("NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pending=%zu running=%zu done=%zu",
1467  acked, totalTasks, activeWorkersNow, outstandingMessages,
1468  fIpcSession->taskStateManager.PendingCount(),
1469  fIpcSession->taskStateManager.RunningCount(),
1470  fIpcSession->taskStateManager.DoneCount());
1471  nextSchedulerLogAck += 200;
1472  }
1473  continue;
1474  }
1475 
1476  if (cmd == "ACKB") {
1477  const std::string & workerIdentity = frames[0];
1478  if (frames.size() < 3 || frames[2].empty()) {
1479  firstError = "Malformed IPC ACKB payload received from worker.";
1480  break;
1481  }
1482 
1483  if (outstandingMessages == 0) {
1484  firstError = "IPC message outstanding counter underflow while processing ACKB.";
1485  break;
1486  }
1487  --outstandingMessages;
1488 
1489  std::stringstream ackStream(frames[2]);
1490  std::string ackToken;
1491  while (std::getline(ackStream, ackToken, ',')) {
1492  if (ackToken.empty()) continue;
1493  size_t ackTaskId = 0;
1494  try {
1495  ackTaskId = static_cast<size_t>(std::stoull(ackToken));
1496  }
1497  catch (...) {
1498  firstError = "Malformed IPC ACKB task id received from worker.";
1499  break;
1500  }
1501 
1502  // Mark task as done using TaskStateManager
1503  if (!fIpcSession->taskStateManager.MarkDone(ackTaskId)) {
1504  firstError = "Received ACKB for unknown or already-done task " + std::to_string(ackTaskId) + ".";
1505  break;
1506  }
1507 
1508  // Update TCP worker activity tracking
1509  if (fIpcSession->isTcp) {
1510  fIpcSession->workerLastActivity[workerIdentity] = std::chrono::steady_clock::now();
1511  }
1512 
1513  if (outstanding == 0) {
1514  firstError = "IPC outstanding counter underflow while processing ACKB.";
1515  break;
1516  }
1517  --outstanding;
1518  ++acked;
1519  lastProgress = std::chrono::steady_clock::now();
1520  const size_t activeWorkersNow = fIpcSession->workerIdentityVec.size();
1521  if (progressCallback) {
1522  ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1523  fIpcSession->taskStateManager.RunningCount(),
1524  fIpcSession->taskStateManager.DoneCount(),
1525  activeWorkersNow};
1526  progressCallback(progress);
1527  }
1528  if (acked >= nextSchedulerLogAck) {
1529  NLogDebug(
1530  "NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pending=%zu running=%zu done=%zu",
1531  acked, totalTasks, activeWorkersNow, outstandingMessages,
1532  fIpcSession->taskStateManager.PendingCount(),
1533  fIpcSession->taskStateManager.RunningCount(),
1534  fIpcSession->taskStateManager.DoneCount());
1535  nextSchedulerLogAck += 200;
1536  }
1537  }
1538 
1539  if (!firstError.empty()) {
1540  break;
1541  }
1542  continue;
1543  }
1544 
1545  if (cmd == "ERR") {
1546  size_t taskId = 0;
1547  try {
1548  taskId = static_cast<size_t>(std::stoull(frames[2]));
1549  }
1550  catch (...) {
1551  firstError = "Malformed IPC task id received from worker.";
1552  break;
1553  }
1554  std::string errMsg = (frames.size() >= 4) ? frames[3] : "worker error";
1555  firstError = "Worker reported error for task " + std::to_string(taskId) + ": " + errMsg;
1556  break;
1557  }
1558 
1559  firstError = "Unknown IPC command from worker: " + cmd;
1560  break;
1561  }
1562 
1563  if (!firstError.empty()) {
1564  throw std::runtime_error(firstError);
1565  }
1566 
1567  // All tasks should be either done or somehow lost - check state
1568  const size_t pendingCount = fIpcSession->taskStateManager.PendingCount();
1569  const size_t runningCount = fIpcSession->taskStateManager.RunningCount();
1570  if (pendingCount > 0 || runningCount > 0) {
1571  throw std::runtime_error("IPC execution finished with " + std::to_string(pendingCount) + " pending and " +
1572  std::to_string(runningCount) + " running tasks still unacknowledged.");
1573  }
1574 
1575  return acked;
1576 }
1577 
1578 void NDimensionalExecutor::FinishProcessIpc(bool abort)
1579 {
1580  if (!fIpcSession) {
1581  return;
1582  }
1583 
1584  const std::string stopReason = abort ? "abort" : "ok";
1585  for (const auto & it : fIpcSession->identityToWorker) {
1586  NDimensionalIpcRunner::SendFrames(fIpcSession->router, {it.first, "STOP", stopReason});
1587  }
1588 
1589  if (!fIpcSession->isTcp) {
1590  const bool exitedCleanly = NDimensionalIpcRunner::WaitForChildProcesses(fIpcSession->childPids, 1500);
1591  if (!exitedCleanly) {
1592  NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
1593  }
1594  } else if (!abort) {
1595  // TCP mode normal finish: wait for DONE from all workers before merging
1596  const size_t nWorkers = fIpcSession->identityToWorker.size();
1597  std::set<std::string> doneWorkers;
1598  for (const auto & workerIdentity : fIpcSession->earlyDoneWorkers) {
1599  if (fIpcSession->identityToWorker.count(workerIdentity)) {
1600  doneWorkers.insert(workerIdentity);
1601  }
1602  }
1603  const auto doneDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(60);
1604  while (doneWorkers.size() < nWorkers) {
1605  const auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(doneDeadline - std::chrono::steady_clock::now()).count();
1606  if (remaining <= 0) {
1607  NLogWarning("NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1608  doneWorkers.size(), nWorkers);
1609  break;
1610  }
1611  zmq_pollitem_t item = {fIpcSession->router, 0, ZMQ_POLLIN, 0};
1612  const int rc = zmq_poll(&item, 1, static_cast<long>(remaining));
1613  if (rc <= 0) {
1614  NLogWarning("NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1615  doneWorkers.size(), nWorkers);
1616  break;
1617  }
1618  std::vector<std::string> frames;
1619  if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames) || frames.size() < 2) continue;
1620  if (frames[1] == "DONE") {
1621  const std::string & workerIdentity = frames[0];
1622  doneWorkers.insert(workerIdentity);
1623 
1624  NLogDebug("NDimensionalExecutor::FinishProcessIpc: Worker '%s' sent DONE (%zu/%zu)", workerIdentity.c_str(),
1625  doneWorkers.size(), nWorkers);
1626  } else if (frames[1] == "READY") {
1627  // Worker arrived after all tasks were dispatched — send STOP immediately.
1628  NLogInfo("NDimensionalExecutor::FinishProcessIpc: Late worker '%s' arrived, sending STOP", frames[0].c_str());
1629  NDimensionalIpcRunner::SendFrames(fIpcSession->router, {frames[0], "STOP", "ok"});
1630  } else if (frames[1] == "BOOTSTRAP") {
1631  // Worker is still bootstrapping — reply with CONFIG and a STOP will follow via READY.
1632  HandleBootstrap(frames[0]);
1633  }
1634  }
1635  }
1636  // Bound socket close time to avoid hangs at process end when peers disappear
1637  // or when STOP/DONE frames are still queued.
1638  if (fIpcSession->router) {
1639  int lingerMs = 0;
1640  if (fIpcSession->isTcp && abort) {
1641  // In TCP abort, allow a short grace period to flush STOP frames.
1642  lingerMs = 2000;
1643  }
1644  zmq_setsockopt(fIpcSession->router, ZMQ_LINGER, &lingerMs, sizeof(lingerMs));
1645  }
1646 
1647  if (fIpcSession->router) {
1648  zmq_close(fIpcSession->router);
1649  }
1650  if (fIpcSession->ctx) {
1651  zmq_ctx_term(fIpcSession->ctx);
1652  }
1653  if (!fIpcSession->isTcp && !fIpcSession->endpointPath.empty()) {
1654  ::unlink(fIpcSession->endpointPath.c_str());
1655  }
1656  // Capture registered worker indices before releasing the session.
1657  fRegisteredWorkerIndices.clear();
1658  if (fIpcSession) {
1659  for (const auto & kv : fIpcSession->identityToWorker) {
1660  fRegisteredWorkerIndices.insert(kv.second);
1661  }
1662  }
1663  RestoreIpcSigIntHandler(fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
1664  fIpcSession.reset();
1665 }
1666 
1667 template void NDimensionalExecutor::ExecuteParallel<NGnThreadData>(
1668  const std::function<void(const std::vector<int> & coords, NGnThreadData & thread_object)> & func,
1669  std::vector<NGnThreadData> & thread_objects);
1670 
1671 } // namespace Ndmspc
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
size_t HandleWorkerFailure(const std::string &failedIdentity, const std::string &failureReason, size_t &outstanding, size_t &acked)
Centralized worker failure handling: recovers tasks, removes worker, updates state....
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Definition: NLogger.cxx:132
Manages task lifecycle: pending → running → done/failed.
Thread-local data object for NDMSPC processing.
Definition: NThreadData.h:21
size_t GetAssignedIndex() const
Get the assigned index for the thread.
Definition: NThreadData.h:96
Execution progress metrics for IPC-based distributed processing.