ndmspc  v1.2.0-0.1.rc5
ndmspc-worker.cxx
1 #include <CLI11.hpp>
2 #include <csignal>
3 #include <chrono>
4 #include <sstream>
5 #include <sys/wait.h>
6 #include <string>
7 #include <thread>
8 #include <vector>
9 #include <unistd.h>
10 #include "TROOT.h"
11 #include "TApplication.h"
12 #include "TSystem.h"
13 #include "NLogger.h"
14 #include "NUtils.h"
15 #include "ndmspc.h"
16 #include "NDimensionalIpcRunner.h"
17 #include <zmq.h>
18 
19 static std::string app_description()
20 {
21  size_t size = 64;
22  auto buf = std::make_unique<char[]>(size);
23  size = std::snprintf(buf.get(), size, "%s v%s-%s (worker)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
24  return std::string(buf.get(), size);
25 }
26 
27 static std::string app_version()
28 {
29  size_t size = 128;
30  auto buf = std::make_unique<char[]>(size);
31  size = std::snprintf(buf.get(), size, "%s v%s-%s", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
32  return std::string(buf.get(), size);
33 }
34 
35 namespace {
36 
37 volatile sig_atomic_t gTerminateRequested = 0;
38 
39 void handle_signal(int)
40 {
41  gTerminateRequested = 1;
42 }
43 
44 pid_t spawn_worker_process(const std::string & workerBin, const std::string & endpoint, const std::string & mode,
45  const std::string & macroList, const std::string & macroParams, bool verbose)
46 {
47  pid_t pid = fork();
48  if (pid < 0) return -1;
49 
50  if (pid == 0) {
51  std::vector<std::string> args;
52  args.emplace_back(workerBin);
53  args.emplace_back("--endpoint");
54  args.emplace_back(endpoint);
55  if (!mode.empty()) {
56  args.emplace_back("--mode");
57  args.emplace_back(mode);
58  }
59  if (!macroList.empty()) {
60  args.emplace_back("--macro");
61  args.emplace_back(macroList);
62  }
63  if (!macroParams.empty()) {
64  args.emplace_back("--macro-params");
65  args.emplace_back(macroParams);
66  }
67  if (verbose) args.emplace_back("--verbose");
68 
69  std::vector<char *> cargs;
70  cargs.reserve(args.size() + 1);
71  for (auto & arg : args) {
72  cargs.push_back(const_cast<char *>(arg.c_str()));
73  }
74  cargs.push_back(nullptr);
75 
76  execvp(workerBin.c_str(), cargs.data());
77  _exit(127);
78  }
79 
80  return pid;
81 }
82 
83 void terminate_workers(const std::vector<pid_t> & workerPids, int signalToSend)
84 {
85  for (pid_t pid : workerPids) {
86  if (pid > 0) kill(pid, signalToSend);
87  }
88 }
89 
90 int wait_for_workers(std::vector<pid_t> & workerPids)
91 {
92  size_t aliveCount = workerPids.size();
93  int exitCode = 0;
94 
95  while (aliveCount > 0) {
96  if (gTerminateRequested) {
97  terminate_workers(workerPids, SIGTERM);
98  }
99 
100  bool progressed = false;
101  for (pid_t & pid : workerPids) {
102  if (pid <= 0) continue;
103  int status = 0;
104  pid_t rc = waitpid(pid, &status, WNOHANG);
105  if (rc == 0) continue;
106  if (rc < 0) {
107  pid = -1;
108  --aliveCount;
109  progressed = true;
110  exitCode = 1;
111  continue;
112  }
113 
114  pid = -1;
115  --aliveCount;
116  progressed = true;
117  if (WIFEXITED(status)) {
118  if (WEXITSTATUS(status) != 0) exitCode = 1;
119  } else {
120  exitCode = 1;
121  }
122  }
123 
124  if (!progressed) {
125  std::this_thread::sleep_for(std::chrono::milliseconds(50));
126  }
127  }
128 
129  if (gTerminateRequested) {
130  return 128 + SIGTERM;
131  }
132  return exitCode;
133 }
134 
135 } // namespace
136 
137 int main(int argc, char ** argv)
138 {
139  signal(SIGTERM, handle_signal);
140  signal(SIGINT, handle_signal);
141 
142  TApplication rootApp("ndmspc-worker", 0, nullptr);
143 
144  CLI::App app{app_description()};
145  app.set_version_flag("--version", app_version(), "Print version information and exit");
146 
147  std::string endpoint;
148  size_t workerIndex = std::numeric_limits<size_t>::max(); // sentinel: not set
149  std::string macroList;
150  std::string macroParams;
151  std::string mode; // ipc | process | tcp | thread (optional override)
152  std::string workerBin;
153  size_t spawnWorkers = 0;
154  bool verbose = false;
155 
156  app.add_option("-e,--endpoint", endpoint, "Master endpoint (e.g. tcp://host:5555)")->required();
157  app.add_option("-i,--index", workerIndex, "Worker index (default: auto-assigned by supervisor)");
158  app.add_option("-m,--macro", macroList, "Comma-separated list of macro file(s) or URLs to execute");
159  app.add_option("--macro-params", macroParams,
160  "Parameter list forwarded to TMacro::Exec(params), e.g. '42,\"sample\"'");
161  app.add_option("--mode", mode,
162  "Execution mode override: ipc/process (local), tcp (remote), thread")
163  ->check(CLI::IsMember({"ipc", "process", "tcp", "thread"}));
164  app.add_option("--spawn-workers", spawnWorkers,
165  "Spawn N local ndmspc-worker processes (spawner mode)");
166  app.add_option("--worker-bin", workerBin,
167  "Worker executable for spawner mode (default: current executable)");
168  app.add_flag("-v,--verbose", verbose, "Enable verbose logging");
169 
170  CLI11_PARSE(app, argc, argv);
171 
172  // Default to file-only logging unless explicitly configured by environment.
173  if (!gSystem->Getenv("NDMSPC_LOG_CONSOLE")) {
174  gSystem->Setenv("NDMSPC_LOG_CONSOLE", "0");
175  if (!verbose) {
177  }
178  }
179 
180  if (verbose) {
182  }
183 
184  if (mode == "process") mode = "ipc";
185  if (!mode.empty() && !gSystem->Getenv("NDMSPC_EXECUTION_MODE")) {
186  gSystem->Setenv("NDMSPC_EXECUTION_MODE", mode.c_str());
187  }
188  if (!macroParams.empty() && !gSystem->Getenv("NDMSPC_MACRO_PARAMS")) {
189  gSystem->Setenv("NDMSPC_MACRO_PARAMS", macroParams.c_str());
190  }
191 
192  if (spawnWorkers > 0) {
193  if (workerIndex != std::numeric_limits<size_t>::max()) {
194  NLogError("ndmspc-worker: --index cannot be combined with --spawn-workers");
195  return 1;
196  }
197  if (workerBin.empty()) {
198  workerBin = (argc > 0 && argv[0] && argv[0][0] != '\0') ? argv[0] : "ndmspc-worker";
199  }
200 
201  NLogInfo("ndmspc-worker: spawning %zu worker(s) using '%s' -> %s", spawnWorkers, workerBin.c_str(),
202  endpoint.c_str());
203  std::vector<pid_t> workerPids;
204  workerPids.reserve(spawnWorkers);
205  for (size_t i = 0; i < spawnWorkers; ++i) {
206  const pid_t pid = spawn_worker_process(workerBin, endpoint, mode, macroList, macroParams, verbose);
207  if (pid < 0) {
208  NLogError("ndmspc-worker: failed to spawn worker %zu", i);
209  terminate_workers(workerPids, SIGTERM);
210  return 1;
211  }
212  workerPids.push_back(pid);
213  }
214  return wait_for_workers(workerPids);
215  }
216 
217  // If macro or index not provided, bootstrap from supervisor
218  const bool needsBootstrap = macroList.empty() || workerIndex == std::numeric_limits<size_t>::max();
219  if (needsBootstrap) {
220  NLogPrint("ndmspc-worker: bootstrapping config from supervisor at %s ...", endpoint.c_str());
221  void * ctx = zmq_ctx_new();
222  void * dealer = zmq_socket(ctx, ZMQ_DEALER);
223  // Use a unique bootstrap identity so supervisor can route the CONFIG reply
224  std::ostringstream bootIdBuilder;
225  const char * host = gSystem->HostName();
226  bootIdBuilder << "boot_" << (host ? host : "unknown") << '_' << gSystem->GetPid() << '_'
227  << std::chrono::high_resolution_clock::now().time_since_epoch().count();
228  const std::string bootId = bootIdBuilder.str();
229  zmq_setsockopt(dealer, ZMQ_IDENTITY, bootId.data(), bootId.size());
230  int timeoutMs = 1000;
231  zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
232  zmq_connect(dealer, endpoint.c_str());
233  Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"BOOTSTRAP"});
234 
235  const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(300);
236  bool configOk = false;
237  bool rejectedBySupervisor = false;
238  while (!configOk) {
239  std::vector<std::string> frames;
240  if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
241  if (errno == EAGAIN || errno == EWOULDBLOCK) {
242  if (std::chrono::steady_clock::now() > deadline) break;
243  continue;
244  }
245  break;
246  }
247  // CONFIG frames: "CONFIG", workerIdx, macroList, NDMSPC_TMP_DIR,
248  // NDMSPC_TMP_RESULTS_DIR, [optional] NDMSPC_MACRO_PARAMS
249  if (frames.size() >= 5 && frames[0] == "CONFIG") {
250  if (workerIndex == std::numeric_limits<size_t>::max())
251  workerIndex = static_cast<size_t>(std::stoul(frames[1]));
252  if (macroList.empty())
253  macroList = frames[2];
254  if (!frames[3].empty())
255  gSystem->Setenv("NDMSPC_TMP_DIR", frames[3].c_str());
256  if (!frames[4].empty())
257  gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", frames[4].c_str());
258  if (frames.size() >= 6 && !frames[5].empty() && !gSystem->Getenv("NDMSPC_MACRO_PARAMS"))
259  gSystem->Setenv("NDMSPC_MACRO_PARAMS", frames[5].c_str());
260  configOk = true;
261  } else if (frames.size() >= 1 && frames[0] == "REJECT") {
262  const std::string reason = (frames.size() >= 2) ? frames[1] : "unspecified";
263  NLogWarning("ndmspc-worker: bootstrap rejected by supervisor (%s), exiting", reason.c_str());
264  rejectedBySupervisor = true;
265  break;
266  }
267  }
268  zmq_close(dealer);
269  zmq_ctx_term(ctx);
270 
271  if (rejectedBySupervisor) {
272  return 0;
273  }
274 
275  if (!configOk) {
276  NLogError("ndmspc-worker: failed to receive CONFIG from supervisor at %s, exiting", endpoint.c_str());
277  return 1;
278  }
279  }
280 
281  // Fallback: if NDMSPC_TMP_RESULTS_DIR is unset or empty, use NDMSPC_TMP_DIR
282  if (!gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR")[0] == '\0') {
283  const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
284  if (tmpDirEnv && tmpDirEnv[0] != '\0')
285  gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
286  }
287 
288  if (macroList.empty()) {
289  NLogError("ndmspc-worker: no macro specified (use --macro or let supervisor send via bootstrap)");
290  return 1;
291  }
292  if (workerIndex == std::numeric_limits<size_t>::max()) workerIndex = 0;
293 
294  // Set env vars so NGnTree::Process enters worker mode
295  gSystem->Setenv("NDMSPC_WORKER_ENDPOINT", endpoint.c_str());
296  gSystem->Setenv("NDMSPC_WORKER_INDEX", std::to_string(workerIndex).c_str());
297 
298  NLogPrint("ndmspc-worker: starting — index=%zu endpoint=%s", workerIndex, endpoint.c_str());
299  NLogPrint(" macro = %s", macroList.c_str());
300  NLogPrint(" NDMSPC_TMP_DIR = %s", gSystem->Getenv("NDMSPC_TMP_DIR") ? gSystem->Getenv("NDMSPC_TMP_DIR") : "(not set)");
301  NLogPrint(" NDMSPC_TMP_RESULTS_DIR = %s", gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") ? gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") : "(not set)");
302  NLogPrint(" NDMSPC_EXECUTION_MODE = %s", gSystem->Getenv("NDMSPC_EXECUTION_MODE") ? gSystem->Getenv("NDMSPC_EXECUTION_MODE") : "(not set)");
303  NLogPrint(" NDMSPC_MACRO_PARAMS = %s", gSystem->Getenv("NDMSPC_MACRO_PARAMS") ? gSystem->Getenv("NDMSPC_MACRO_PARAMS") : "(not set)");
304 
305  const std::string effectiveMacroParams = gSystem->Getenv("NDMSPC_MACRO_PARAMS") ? gSystem->Getenv("NDMSPC_MACRO_PARAMS") : "";
306  std::vector<std::string> macros = Ndmspc::NUtils::Tokenize(macroList, ',');
307  for (const auto & macro : macros) {
308  if (effectiveMacroParams.empty()) {
309  NLogInfo("ndmspc-worker: executing macro '%s'", macro.c_str());
310  } else {
311  NLogInfo("ndmspc-worker: executing macro '%s' with params '%s'", macro.c_str(), effectiveMacroParams.c_str());
312  }
313  TMacro * m = Ndmspc::NUtils::OpenMacro(macro);
314  if (!m) {
315  NLogError("ndmspc-worker: failed to open macro '%s', exiting", macro.c_str());
316  return 1;
317  }
318  m->Exec(effectiveMacroParams.empty() ? nullptr : effectiveMacroParams.c_str());
319  delete m;
320  }
321 
322  NLogInfo("ndmspc-worker: done");
323  return 0;
324 }
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
Definition: NLogger.h:515
static TMacro * OpenMacro(std::string filename)
Open a macro file.
Definition: NUtils.cxx:781
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition: NUtils.cxx:1077