ndmspc  v1.2.0-0.1.rc7
ndmspc-run.cxx
1 #include <CLI11.hpp>
2 #include <chrono>
3 #include <csignal>
4 #include <sys/wait.h>
5 #include <string>
6 #include <thread>
7 #include <vector>
8 #include <cstdio>
9 #include <unistd.h>
10 #include "TROOT.h"
11 #include "TApplication.h"
12 #include "TSystem.h"
13 #include "NLogger.h"
14 #include "NUtils.h"
15 #include "ndmspc.h"
16 
17 namespace {
18 
19 pid_t spawn_worker_process(const std::string & workerBin, const std::string & endpoint, bool verbose)
20 {
21  pid_t pid = fork();
22  if (pid < 0) return -1;
23 
24  if (pid == 0) {
25  if (verbose) {
26  execlp(workerBin.c_str(), workerBin.c_str(), "--endpoint", endpoint.c_str(), "--verbose", nullptr);
27  } else {
28  execlp(workerBin.c_str(), workerBin.c_str(), "--endpoint", endpoint.c_str(), nullptr);
29  }
30  _exit(127);
31  }
32 
33  return pid;
34 }
35 
36 void stop_worker_processes(std::vector<pid_t> & workerPids)
37 {
38  if (workerPids.empty()) return;
39 
40  for (pid_t pid : workerPids) {
41  if (pid > 0) kill(pid, SIGTERM);
42  }
43 
44  const auto waitDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(2);
45  for (pid_t pid : workerPids) {
46  if (pid <= 0) continue;
47 
48  int status = 0;
49  while (waitpid(pid, &status, WNOHANG) == 0) {
50  if (std::chrono::steady_clock::now() >= waitDeadline) {
51  kill(pid, SIGKILL);
52  waitpid(pid, &status, 0);
53  break;
54  }
55  std::this_thread::sleep_for(std::chrono::milliseconds(25));
56  }
57  }
58 
59  workerPids.clear();
60 }
61 
62 } // namespace
63 
64 static std::string app_description()
65 {
66  size_t size = 128;
67  auto buf = std::make_unique<char[]>(size);
68  size = std::snprintf(buf.get(), size, "%s v%s-%s (run)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
69  return std::string(buf.get(), size);
70 }
71 
72 static std::string app_version()
73 {
74  size_t size = 128;
75  auto buf = std::make_unique<char[]>(size);
76  size = std::snprintf(buf.get(), size, "%s v%s-%s", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
77  return std::string(buf.get(), size);
78 }
79 
80 int main(int argc, char ** argv)
81 {
82  TApplication rootApp("ndmspc-run", 0, nullptr);
83  gROOT->SetBatch(kTRUE);
84 
85  CLI::App app{app_description()};
86  app.set_version_flag("--version", app_version(), "Print version information and exit");
87 
88  std::string macroList;
89  std::string macroParams;
90  std::string mode; // ipc | tcp | thread (maps to NDMSPC_EXECUTION_MODE)
91  size_t nProcesses = 0; // 0 = not set
92  std::string tcpPort;
93  std::string workerBin;
94  std::string workerEndpoint;
95  std::string tmpDir;
96  std::string tmpResultsDir;
97  size_t spawnWorkers = 0;
98  bool verbose = false;
99 
100  app.add_option("macro", macroList,
101  "Comma-separated list of macro file(s) or URLs to execute")
102  ->required();
103  app.add_option("--macro-params", macroParams,
104  "Parameter list forwarded to TMacro::Exec(params), e.g. '42,\"sample\"'");
105  app.add_option("--mode", mode,
106  "Execution mode: ipc/process (forked local processes), tcp (remote workers), thread")
107  ->check(CLI::IsMember({"ipc", "process", "tcp", "thread"}));
108  app.add_option("-n,--processes", nProcesses,
109  "Number of worker processes (NDMSPC_MAX_PROCESSES)");
110  app.add_option("--tcp-port", tcpPort,
111  "TCP port to bind for remote workers (NDMSPC_TCP_PORT, default: 5555)");
112  app.add_option("--spawn-workers", spawnWorkers,
113  "In TCP mode, spawn N local ndmspc-worker processes");
114  app.add_option("--worker-bin", workerBin,
115  "Worker executable to spawn (default: ndmspc-worker)");
116  app.add_option("--worker-endpoint", workerEndpoint,
117  "Worker endpoint for spawned workers (default: tcp://localhost:<tcp-port>)");
118  app.add_option("--tmp-dir", tmpDir,
119  "Local scratch directory for temporary files (NDMSPC_TMP_DIR)");
120  app.add_option("--results-dir", tmpResultsDir,
121  "Shared results directory where workers deposit output (NDMSPC_TMP_RESULTS_DIR)");
122  app.add_flag("-v,--verbose", verbose, "Enable verbose logging");
123 
124  CLI11_PARSE(app, argc, argv);
125 
126  // Default to file-only logging unless explicitly configured by environment.
127  if (!gSystem->Getenv("NDMSPC_LOG_CONSOLE")) {
128  gSystem->Setenv("NDMSPC_LOG_CONSOLE", "0");
129  if (!verbose) {
131  }
132  }
133 
134  if (verbose) {
136  }
137 
138  // Set env vars from CLI args (only if not already set in the environment)
139  auto setenvIfEmpty = [](const char * name, const std::string & value) {
140  if (!value.empty() && !gSystem->Getenv(name))
141  gSystem->Setenv(name, value.c_str());
142  };
143 
144  if (mode == "process") mode = "ipc";
145  if (!mode.empty() && !gSystem->Getenv("NDMSPC_EXECUTION_MODE"))
146  gSystem->Setenv("NDMSPC_EXECUTION_MODE", mode.c_str());
147 
148  // When auto-spawning TCP workers, default NDMSPC_MAX_PROCESSES to that count
149  // if the caller didn't set --processes / NDMSPC_MAX_PROCESSES explicitly.
150  if (spawnWorkers > 0 && nProcesses == 0 && !gSystem->Getenv("NDMSPC_MAX_PROCESSES")) {
151  nProcesses = spawnWorkers;
152  }
153 
154  if (nProcesses > 0 && !gSystem->Getenv("NDMSPC_MAX_PROCESSES"))
155  gSystem->Setenv("NDMSPC_MAX_PROCESSES", std::to_string(nProcesses).c_str());
156  setenvIfEmpty("NDMSPC_MACRO_PARAMS", macroParams);
157  setenvIfEmpty("NDMSPC_TCP_PORT", tcpPort);
158  setenvIfEmpty("NDMSPC_TMP_DIR", tmpDir);
159  setenvIfEmpty("NDMSPC_TMP_RESULTS_DIR", tmpResultsDir);
160 
161  const std::string effectiveMacroParams = gSystem->Getenv("NDMSPC_MACRO_PARAMS") ? gSystem->Getenv("NDMSPC_MACRO_PARAMS") : "";
162 
163  if (workerBin.empty()) workerBin = "ndmspc-worker";
164 
165  std::vector<pid_t> spawnedWorkers;
166  auto cleanupSpawnedWorkers = [&]() { stop_worker_processes(spawnedWorkers); };
167 
168  if (spawnWorkers > 0) {
169  const std::string effectiveMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE") ? gSystem->Getenv("NDMSPC_EXECUTION_MODE") : "";
170  if (effectiveMode != "tcp") {
171  NLogError("ndmspc-run: --spawn-workers is supported only with --mode tcp (effective mode: '%s')",
172  effectiveMode.c_str());
173  return 1;
174  }
175 
176  if (workerEndpoint.empty()) {
177  const char * effectivePort = gSystem->Getenv("NDMSPC_TCP_PORT");
178  workerEndpoint = std::string("tcp://localhost:") + (effectivePort ? effectivePort : "5555");
179  }
180 
181  NLogInfo("ndmspc-run: spawning %zu worker(s) using '%s' at %s", spawnWorkers, workerBin.c_str(),
182  workerEndpoint.c_str());
183  spawnedWorkers.reserve(spawnWorkers);
184  for (size_t i = 0; i < spawnWorkers; ++i) {
185  const pid_t pid = spawn_worker_process(workerBin, workerEndpoint, verbose);
186  if (pid < 0) {
187  NLogError("ndmspc-run: failed to spawn worker %zu", i);
188  cleanupSpawnedWorkers();
189  return 1;
190  }
191  spawnedWorkers.push_back(pid);
192  }
193  }
194 
195  // Expose the macro list so NGnTree::Process can forward it to TCP workers
196  // automatically without requiring an explicit tree.SetWorkerMacro() call.
197  gSystem->Setenv("NDMSPC_MACRO", macroList.c_str());
198 
199  std::vector<std::string> macros = Ndmspc::NUtils::Tokenize(macroList, ',');
200  for (const auto & macro : macros) {
201  if (effectiveMacroParams.empty()) {
202  NLogInfo("ndmspc-run: executing macro '%s'", macro.c_str());
203  } else {
204  NLogInfo("ndmspc-run: executing macro '%s' with params '%s'", macro.c_str(), effectiveMacroParams.c_str());
205  }
206  TMacro * m = Ndmspc::NUtils::OpenMacro(macro);
207  if (!m) {
208  NLogError("ndmspc-run: failed to open macro '%s', exiting", macro.c_str());
209  cleanupSpawnedWorkers();
210  return 1;
211  }
212  m->Exec(effectiveMacroParams.empty() ? nullptr : effectiveMacroParams.c_str());
213  delete m;
214  }
215 
216  cleanupSpawnedWorkers();
217  NLogInfo("ndmspc-run: done");
218 
219  // // Workaround: ROOT teardown can intermittently hang at process exit in IPC
220  // // batch mode after all useful work has already completed.
221  // const std::string effectiveMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE") ? gSystem->Getenv("NDMSPC_EXECUTION_MODE") : "";
222  // if (effectiveMode == "ipc") {
223  // std::fflush(nullptr);
224  // _exit(0);
225  // }
226 
227  return 0;
228 }
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
Definition: NLogger.h:515
static TMacro * OpenMacro(std::string filename)
Open a macro file.
Definition: NUtils.cxx:781
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition: NUtils.cxx:1077