ndmspc  v1.2.0-0.1.rc7
NDimensionalIpcRunner.cxx
1 #include <cerrno>
2 #include <chrono>
3 #include <cstring>
4 #include <iomanip>
5 #include <sstream>
6 #include <signal.h>
7 #include <sys/wait.h>
8 #include <thread>
9 #include <unistd.h>
10 #include <zmq.h>
11 #include "NUtils.h"
12 #include <TSystem.h>
13 #include "NDimensionalIpcRunner.h"
14 #include "NGnThreadData.h"
15 #include "NThreadData.h"
16 
17 namespace Ndmspc {
18 
19 namespace {
20 // Worker signal handler for Ctrl+C
21 volatile sig_atomic_t gWorkerInterrupted = 0;
22 
23 void WorkerSigIntHandler(int)
24 {
25  gWorkerInterrupted = 1;
26 }
27 
28 std::string SerializeTaskIds(const std::vector<std::string> & taskIds)
29 {
30  std::ostringstream oss;
31  for (size_t i = 0; i < taskIds.size(); ++i) {
32  if (i != 0) oss << ',';
33  oss << taskIds[i];
34  }
35  return oss.str();
36 }
37 
38 bool ParseTaskBatchPayload(const std::string & payload, std::vector<std::pair<std::string, std::vector<int>>> & tasks)
39 {
40  tasks.clear();
41  std::stringstream batchStream(payload);
42  std::string taskToken;
43  while (std::getline(batchStream, taskToken, ';')) {
44  if (taskToken.empty()) continue;
45  size_t sep = taskToken.find(':');
46  if (sep == std::string::npos || sep == 0 || sep + 1 >= taskToken.size()) {
47  return false;
48  }
49  const std::string taskId = taskToken.substr(0, sep);
50  const std::string coords = taskToken.substr(sep + 1);
51  std::vector<int> parsedCoords;
52  std::stringstream coordStream(coords);
53  std::string coordToken;
54  while (std::getline(coordStream, coordToken, ',')) {
55  if (coordToken.empty()) continue;
56  parsedCoords.push_back(std::stoi(coordToken));
57  }
58  tasks.emplace_back(taskId, std::move(parsedCoords));
59  }
60  return !tasks.empty();
61 }
62 } // namespace
63 
64 bool NDimensionalIpcRunner::SendFrames(void * socket, const std::vector<std::string> & frames)
65 {
66  for (size_t i = 0; i < frames.size(); ++i) {
67  int flags = (i + 1 < frames.size()) ? ZMQ_SNDMORE : 0;
68  if (zmq_send(socket, frames[i].data(), frames[i].size(), flags) < 0) {
69  return false;
70  }
71  }
72  return true;
73 }
74 
75 bool NDimensionalIpcRunner::ReceiveFrames(void * socket, std::vector<std::string> & outFrames)
76 {
77  outFrames.clear();
78  while (true) {
79  zmq_msg_t msg;
80  zmq_msg_init(&msg);
81  int rc = zmq_msg_recv(&msg, socket, 0);
82  if (rc < 0) {
83  zmq_msg_close(&msg);
84  return false;
85  }
86  outFrames.emplace_back(static_cast<const char *>(zmq_msg_data(&msg)), static_cast<size_t>(rc));
87  int more = zmq_msg_more(&msg);
88  zmq_msg_close(&msg);
89  if (!more) break;
90  }
91  return true;
92 }
93 
94 std::string NDimensionalIpcRunner::BuildWorkerIdentity(size_t workerIndex)
95 {
96  std::ostringstream oss;
97  oss << "wk_" << std::setw(6) << std::setfill('0') << workerIndex;
98  return oss.str();
99 }
100 
101 std::string NDimensionalIpcRunner::SerializeCoords(const std::vector<int> & coords)
102 {
103  std::ostringstream oss;
104  for (size_t i = 0; i < coords.size(); ++i) {
105  if (i != 0) oss << ',';
106  oss << coords[i];
107  }
108  return oss.str();
109 }
110 
111 std::string NDimensionalIpcRunner::SerializeIds(const std::vector<Long64_t> & ids)
112 {
113  std::ostringstream oss;
114  for (size_t i = 0; i < ids.size(); ++i) {
115  if (i != 0) oss << ',';
116  oss << ids[i];
117  }
118  return oss.str();
119 }
120 
121 int NDimensionalIpcRunner::WorkerLoop(const std::string & endpoint, size_t workerIndex, NThreadData * worker)
122 {
123  std::ostringstream threadName;
124  threadName << "ipc_" << std::setw(6) << std::setfill('0') << workerIndex;
125  NLogger::SetThreadName(threadName.str());
126 
127  void * ctx = zmq_ctx_new();
128  if (!ctx) {
129  return 1;
130  }
131 
132  void * dealer = zmq_socket(ctx, ZMQ_DEALER);
133  if (!dealer) {
134  zmq_ctx_term(ctx);
135  return 1;
136  }
137 
138  const std::string identity = BuildWorkerIdentity(workerIndex);
139  if (zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size()) != 0) {
140  zmq_close(dealer);
141  zmq_ctx_term(ctx);
142  return 1;
143  }
144 
145  int timeoutMs = 1000;
146  zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
147 
148  if (zmq_connect(dealer, endpoint.c_str()) != 0) {
149  zmq_close(dealer);
150  zmq_ctx_term(ctx);
151  return 1;
152  }
153 
154  if (!SendFrames(dealer, {"READY"})) {
155  zmq_close(dealer);
156  zmq_ctx_term(ctx);
157  return 1;
158  }
159 
160  NLogPrint("Worker %zu: connected to %s, ready for tasks", workerIndex, endpoint.c_str());
161 
162  int rc = TaskLoop(dealer, workerIndex, worker);
163 
164  zmq_close(dealer);
165  zmq_ctx_term(ctx);
166  return rc;
167 }
168 
169 int NDimensionalIpcRunner::TaskLoop(void * dealer, size_t workerIndex, NThreadData * worker)
170 {
171  // Install signal handler for Ctrl+C
172  gWorkerInterrupted = 0;
173  struct sigaction sa;
174  struct sigaction oldSa;
175  memset(&sa, 0, sizeof(sa));
176  sa.sa_handler = WorkerSigIntHandler;
177  sigemptyset(&sa.sa_mask);
178  sa.sa_flags = 0;
179  sigaction(SIGINT, &sa, &oldSa);
180 
181  bool finishedOk = true;
182  bool aborted = false;
183  bool wasInterrupted = false;
184  size_t tasksProcessed = 0;
185  size_t lastReportedProgress = 0;
186  const bool showWorkerProgress = []() {
187  const char * env = gSystem->Getenv("NDMSPC_WORKER_PROGRESS");
188  if (!env || env[0] == '\0') return false;
189  std::string value(env);
190  std::transform(value.begin(), value.end(), value.begin(),
191  [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
192  return (value == "1" || value == "true" || value == "yes" || value == "on");
193  }();
194 
195  const size_t progressReportInterval = []() -> size_t {
196  const char * env = gSystem->Getenv("NDMSPC_WORKER_PROGRESS_INTERVAL");
197  if (!env || env[0] == '\0') return 50UL; // default: report every 50 tasks
198  try {
199  int val = std::stoi(env);
200  return (val > 0) ? static_cast<size_t>(val) : 50UL;
201  } catch (...) {
202  return 50UL;
203  }
204  }();
205 
206  // Non-blocking check: poll the socket and consume a STOP frame if present.
207  // Used between sub-tasks in a batch to react to supervisor abort quickly.
208  auto checkAbort = [&]() -> bool {
209  // Check for Ctrl+C first
210  if (gWorkerInterrupted) {
211  if (!aborted) {
212  // Send shutdown notification on first detection
213  SendFrames(dealer, {"SHUTDOWN", "interrupted", std::to_string(tasksProcessed)});
214  }
215  aborted = true;
216  return true;
217  }
218  zmq_pollitem_t item = {dealer, 0, ZMQ_POLLIN, 0};
219  if (zmq_poll(&item, 1, 0) <= 0) return false;
220  std::vector<std::string> peek;
221  if (!ReceiveFrames(dealer, peek)) return false;
222  if (!peek.empty() && peek[0] == "STOP") {
223  aborted = (peek.size() >= 2 && peek[1] == "abort");
224  if (aborted) NLogPrint("Worker %zu: received abort from supervisor, stopping ...", workerIndex);
225  return true;
226  }
227  return false; // unexpected frame — ignore, will be handled in main loop
228  };
229 
230  while (true) {
231  // Check for Ctrl+C
232  if (gWorkerInterrupted) {
233  if (showWorkerProgress && tasksProcessed > 0) { NLogPrint(""); }
234  NLogPrint("Worker %zu: interrupted by user (Ctrl+C), exiting ...", workerIndex);
235  NLogPrint("Worker %zu: interrupted by user (Ctrl+C) after processing %zu tasks, shutting down...", workerIndex, tasksProcessed);
236  // Notify supervisor before exiting
237  SendFrames(dealer, {"SHUTDOWN", "interrupted", std::to_string(tasksProcessed)});
238  aborted = true;
239  wasInterrupted = true;
240  finishedOk = false;
241  break;
242  }
243 
244  std::vector<std::string> frames;
245  if (!ReceiveFrames(dealer, frames)) {
246  if (errno == EAGAIN || errno == EWOULDBLOCK) {
247  // Check for interruption on timeout
248  if (gWorkerInterrupted) {
249  if (showWorkerProgress && tasksProcessed > 0) { NLogPrint(""); }
250  NLogPrint("Worker %zu: interrupted by user (Ctrl+C), exiting ...", workerIndex);
251  NLogPrint("Worker %zu: interrupted by user (Ctrl+C) after processing %zu tasks, shutting down...", workerIndex, tasksProcessed);
252  // Notify supervisor before exiting
253  SendFrames(dealer, {"SHUTDOWN", "interrupted", std::to_string(tasksProcessed)});
254  aborted = true;
255  wasInterrupted = true;
256  finishedOk = false;
257  break;
258  }
259  continue;
260  }
261  finishedOk = false;
262  break;
263  }
264  if (frames.empty()) continue;
265 
266  const std::string & cmd = frames[0];
267  if (cmd == "STOP") {
268  if (showWorkerProgress && tasksProcessed > 0) { NLogPrint(""); }
269  aborted = (frames.size() >= 2 && frames[1] == "abort");
270  if (aborted) {
271  NLogPrint("Worker %zu: received abort from supervisor, stopping ...", workerIndex);
272  } else {
273  // NLogPrint("Worker %zu: received STOP, processed %zu tasks total", workerIndex, tasksProcessed);
274  }
275  break;
276  }
277  if (cmd == "SETDEF") {
278  if (frames.size() < 2) {
279  finishedOk = false;
280  break;
281  }
282  if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
283  gnWorker->SetCurrentDefinitionName(frames[1]);
284  continue;
285  }
286  finishedOk = false;
287  break;
288  }
289  if (cmd == "SETIDS") {
290  if (frames.size() < 2) {
291  finishedOk = false;
292  break;
293  }
294  if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
295  gnWorker->SyncCurrentDefinitionIds(ParseIds(frames[1]));
296  continue;
297  }
298  finishedOk = false;
299  break;
300  }
301  if (cmd != "TASK" && cmd != "TASKB") {
302  finishedOk = false;
303  break;
304  }
305 
306  std::string errTaskId;
307  try {
308  if (cmd == "TASK") {
309  if (frames.size() < 3) {
310  finishedOk = false;
311  break;
312  }
313 
314  const std::string taskId = frames[1];
315  errTaskId = taskId;
316  std::vector<int> coords = ParseCoords(frames[2]);
317  ++tasksProcessed;
318  if (showWorkerProgress) {
319  NLogPrint("Worker %zu: processing tasks [done: %zu]", workerIndex, tasksProcessed);
320  }
321  // Report progress at configured interval for visibility
322  // Always show first task to confirm worker is actively processing
323  if (tasksProcessed == 1 || tasksProcessed - lastReportedProgress >= progressReportInterval) {
324  // NLogPrint("Worker %zu: processed %zu tasks", workerIndex, tasksProcessed);
325  lastReportedProgress = tasksProcessed;
326  }
327  worker->Process(coords);
328  if (!SendFrames(dealer, {"ACK", taskId})) {
329  finishedOk = false;
330  break;
331  }
332  }
333  else {
334  if (frames.size() < 2) {
335  finishedOk = false;
336  break;
337  }
338 
339  std::vector<std::pair<std::string, std::vector<int>>> batchTasks;
340  if (!ParseTaskBatchPayload(frames[1], batchTasks)) {
341  finishedOk = false;
342  break;
343  }
344 
345  std::vector<std::string> ackedTaskIds;
346  ackedTaskIds.reserve(batchTasks.size());
347  tasksProcessed += batchTasks.size();
348  if (showWorkerProgress) {
349  NLogPrint("Worker %zu: processing tasks [done: %zu]", workerIndex, tasksProcessed);
350  }
351  // Report progress at configured interval for visibility
352  // Always show first task to confirm worker is actively processing
353  if (tasksProcessed == 1 || tasksProcessed - lastReportedProgress >= progressReportInterval) {
354  // NLogPrint("Worker %zu: processed %zu tasks", workerIndex, tasksProcessed);
355  lastReportedProgress = tasksProcessed;
356  }
357  for (const auto & task : batchTasks) {
358  if (checkAbort()) { finishedOk = false; break; }
359  errTaskId = task.first;
360  worker->Process(task.second);
361  ackedTaskIds.push_back(task.first);
362  }
363  if (aborted) break;
364 
365  if (!SendFrames(dealer, {"ACKB", SerializeTaskIds(ackedTaskIds)})) {
366  finishedOk = false;
367  break;
368  }
369  }
370  }
371  catch (const std::exception & ex) {
372  NLogPrint("Worker %zu: ERROR processing task %s: %s", workerIndex, errTaskId.c_str(), ex.what());
373  SendFrames(dealer, {"ERR", errTaskId.empty() ? "0" : errTaskId, ex.what()});
374  finishedOk = false;
375  break;
376  }
377  catch (...) {
378  NLogPrint("Worker %zu: ERROR processing task %s: unknown exception", workerIndex, errTaskId.c_str());
379  SendFrames(dealer, {"ERR", errTaskId.empty() ? "0" : errTaskId, "unknown worker exception"});
380  finishedOk = false;
381  break;
382  }
383  }
384 
385  if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
386  // Capture local tmp file path before closing (storage object path is cleared after Close).
387  std::string localTmpFile;
388  if (gnWorker->GetHnSparseBase() && gnWorker->GetHnSparseBase()->GetStorageTree()) {
389  localTmpFile = gnWorker->GetHnSparseBase()->GetStorageTree()->GetFileName();
390  }
391 
392  if (aborted) {
393  // Supervisor aborted — skip end function and copy, but close the file handle
394  // so we can delete it cleanly.
395  NLogPrint("Worker %zu: aborting, skipping post-processing.", workerIndex);
396  if (gnWorker->GetHnSparseBase()) {
397  gnWorker->GetHnSparseBase()->Close(false); // false = don't save
398  }
399  } else {
400  NLogDebug("Worker %zu finished processing, executing end function and closing file if open ...", workerIndex);
401  gnWorker->ExecuteEndFunction();
402  if (gnWorker->GetHnSparseBase()) {
403  gnWorker->GetHnSparseBase()->Close(true);
404  }
405 
406  // TCP mode: copy local result file to shared results dir so supervisor can merge.
407  const std::string & resultsFilename = gnWorker->GetResultsFilename();
408  if (!resultsFilename.empty()) {
409  if (!localTmpFile.empty() && localTmpFile != resultsFilename) {
410  const std::string resultsDir = std::string(gSystem->GetDirName(resultsFilename.c_str()));
411  NUtils::CreateDirectory(resultsDir);
412  NLogPrint("Worker %zu copying '%s' -> '%s' ...", workerIndex, localTmpFile.c_str(), resultsFilename.c_str());
413  if (!NUtils::Cp(localTmpFile, resultsFilename, kFALSE)) {
414  NLogError("Worker %zu: failed to copy '%s' to '%s'", workerIndex, localTmpFile.c_str(),
415  resultsFilename.c_str());
416  }
417  }
418  }
419  }
420 
421  // Delete the local tmp file only when a distinct results file was set.
422  // An empty resultsFilename means localTmpFile IS the results file (same path),
423  // so it must be kept for the supervisor to merge.
424  const std::string & resultsFilenameForDelete = gnWorker->GetResultsFilename();
425  const bool hasDistinctResultsFile = !resultsFilenameForDelete.empty() && resultsFilenameForDelete != localTmpFile;
426  if (!localTmpFile.empty() && hasDistinctResultsFile) {
427  NLogPrint("Worker %zu: removing local tmp file '%s'", workerIndex, localTmpFile.c_str());
428  gSystem->Unlink(localTmpFile.c_str());
429  }
430  }
431 
432  if (!aborted) {
433  // Signal master that this worker has finished writing its file.
434  // For TCP mode, master waits for this DONE before starting to merge.
435  // For IPC (fork) mode, master uses WaitForChildProcesses instead; the DONE
436  // message stays unread in the ZMQ buffer which is harmless.
437  SendFrames(dealer, {"DONE"});
438  NLogPrint("Worker %zu: completed successfully, processed %zu tasks total", workerIndex, tasksProcessed);
439  } else if (wasInterrupted) {
440  // Interrupted by Ctrl+C - already printed message above, don't print duplicate
441  } else if (!finishedOk) {
442  NLogPrint("Worker %zu: exited with error after processing %zu tasks", workerIndex, tasksProcessed);
443  }
444 
445  // Drop any unsent/undelivered messages immediately so zmq_close/zmq_ctx_term
446  // do not hang at shutdown.
447  int linger = 0;
448  zmq_setsockopt(dealer, ZMQ_LINGER, &linger, sizeof(linger));
449 
450  // Restore original signal handler
451  sigaction(SIGINT, &oldSa, nullptr);
452 
453  return finishedOk ? 0 : 1;
454 }
455 
456 bool NDimensionalIpcRunner::WaitForChildProcesses(const std::vector<pid_t> & pids, int timeoutMs)
457 {
458  bool allExitedCleanly = true;
459  const bool useTimeout = (timeoutMs >= 0);
460 
461  for (pid_t pid : pids) {
462  if (pid <= 0) continue;
463 
464  int status = 0;
465  pid_t rc = -1;
466  if (!useTimeout) {
467  rc = waitpid(pid, &status, 0);
468  }
469  else {
470  const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
471  while (std::chrono::steady_clock::now() < deadline) {
472  rc = waitpid(pid, &status, WNOHANG);
473  if (rc == pid || rc < 0) {
474  break;
475  }
476  std::this_thread::sleep_for(std::chrono::milliseconds(10));
477  }
478  if (rc == 0) {
479  allExitedCleanly = false;
480  continue;
481  }
482  }
483 
484  if (rc < 0) {
485  allExitedCleanly = false;
486  continue;
487  }
488 
489  if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
490  allExitedCleanly = false;
491  }
492  }
493  return allExitedCleanly;
494 }
495 
496 void NDimensionalIpcRunner::CleanupChildProcesses(const std::vector<pid_t> & pids)
497 {
498  for (pid_t pid : pids) {
499  if (pid <= 0) continue;
500  int status = 0;
501  pid_t rc = waitpid(pid, &status, WNOHANG);
502  if (rc == pid) {
503  continue;
504  }
505  if (rc < 0) {
506  continue;
507  }
508 
509  kill(pid, SIGTERM);
510  rc = waitpid(pid, &status, WNOHANG);
511  if (rc == pid || rc < 0) {
512  continue;
513  }
514 
515  if (!WaitForChildProcesses({pid}, 1500)) {
516  kill(pid, SIGKILL);
517 
518  // Never block indefinitely here: if a child is stuck in uninterruptible I/O
519  // state, waitpid(..., 0) can hang forever and stall process shutdown.
520  const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(1500);
521  while (std::chrono::steady_clock::now() < deadline) {
522  rc = waitpid(pid, &status, WNOHANG);
523  if (rc == pid || rc < 0) break;
524  std::this_thread::sleep_for(std::chrono::milliseconds(10));
525  }
526  if (rc == 0) {
527  NLogWarning("NDimensionalIpcRunner::CleanupChildProcesses: child pid=%d did not exit after SIGKILL; continuing shutdown", pid);
528  }
529  }
530  }
531 }
532 
533 std::vector<int> NDimensionalIpcRunner::ParseCoords(const std::string & coordsStr)
534 {
535  std::vector<int> coords;
536  std::stringstream ss(coordsStr);
537  std::string token;
538  while (std::getline(ss, token, ',')) {
539  if (token.empty()) continue;
540  coords.push_back(std::stoi(token));
541  }
542  return coords;
543 }
544 
545 std::vector<Long64_t> NDimensionalIpcRunner::ParseIds(const std::string & idsStr)
546 {
547  std::vector<Long64_t> ids;
548  std::stringstream ss(idsStr);
549  std::string token;
550  while (std::getline(ss, token, ',')) {
551  if (token.empty()) continue;
552  ids.push_back(static_cast<Long64_t>(std::stoll(token)));
553  }
554  return ids;
555 }
556 
557 } // namespace Ndmspc
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Definition: NLogger.cxx:132
static bool CreateDirectory(const std::string &path)
Definition: NUtils.cxx:688
static int Cp(std::string source, std::string destination, Bool_t progressbar=kTRUE)
Copy a file from source to destination.
Definition: NUtils.cxx:155