ndmspc  v1.2.0-0.1.rc7
NGnTree.cxx
1 #include <chrono>
2 #include <cstddef>
3 #include <ctime>
4 #include <numbers>
5 #include <set>
6 #include <string>
7 #include <vector>
8 #include "TAxis.h"
9 #include <TDirectory.h>
10 #include <TObject.h>
11 #include <TList.h>
12 #include <TROOT.h>
13 #include <THnSparse.h>
14 #include <TH1.h>
15 #include <TCanvas.h>
16 #include <TSystem.h>
17 #include <TMap.h>
18 #include <TObjString.h>
19 #include <TTree.h>
20 #include <TBufferJSON.h>
21 #include <sys/poll.h>
22 #include <zmq.h>
23 #include "NParameters.h"
24 #include "NStorageTree.h"
25 #include "NBinning.h"
26 #include "NBinningDef.h"
27 #include "NDimensionalExecutor.h"
28 #include "NDimensionalIpcRunner.h"
29 #include "NGnThreadData.h"
30 #include "NLogger.h"
31 #include "NTreeBranch.h"
32 #include "NUtils.h"
33 #include "NStorageTree.h"
34 #include "NGnNavigator.h"
35 #include "NGnTree.h"
36 
38 ClassImp(Ndmspc::NGnTree);
40 
41 namespace Ndmspc {
42 
43 std::string NGnTree::BuildObjectPath(const json & cfg, const json & objCfg, const NBinningPoint * point)
44 {
45  std::string objPath = "";
46  if (objCfg.contains("prefix") && objCfg["prefix"].is_string()) {
47  objPath = objCfg["prefix"].get<std::string>();
48  }
49 
50  std::string axisObjectDefaultFormat =
51  cfg["axisObjectDefaultFormat"].is_string() ? cfg["axisObjectDefaultFormat"].get<std::string>() : "%.2f_%.2f";
52  std::string axisDefaultSeparator =
53  cfg["axisDefaultSeparator"].is_string() ? cfg["axisDefaultSeparator"].get<std::string>() : "/";
54 
55  std::string lastSep;
56  for (auto & axisEntry : cfg["axes"]) {
57  std::string axisName;
58  std::string mode;
59  std::string format;
60 
61  if (axisEntry.is_string()) {
62  axisName = axisEntry.get<std::string>();
63  if (axisObjectDefaultFormat.empty()) {
64  mode = "bin";
65  }
66  else {
67  mode = "minmax";
68  format = axisObjectDefaultFormat;
69  }
70  }
71  else if (axisEntry.is_object()) {
72  if (axisEntry.contains("name") && axisEntry["name"].is_string()) {
73  axisName = axisEntry["name"].get<std::string>();
74  }
75  else {
76  continue;
77  }
78  if (axisEntry.contains("mode") && axisEntry["mode"].is_string()) {
79  mode = axisEntry["mode"].get<std::string>();
80  }
81  if (axisEntry.contains("format") && axisEntry["format"].is_string()) {
82  format = axisEntry["format"].get<std::string>();
83  }
84  }
85  else {
86  continue;
87  }
88 
89  if (mode.empty()) {
90  if (axisObjectDefaultFormat.empty())
91  mode = "bin";
92  else
93  mode = "minmax";
94  }
95  if (format.empty()) {
96  if (mode == "minmax")
97  format = axisObjectDefaultFormat.empty() ? "%.2f_%.2f" : axisObjectDefaultFormat;
98  else if (mode == "bin")
99  format = "%d";
100  else
101  format = "%.2f";
102  }
103 
104  if (mode == "minmax") {
105  double min = point->GetBinMin(axisName);
106  double max = point->GetBinMax(axisName);
107  objPath += TString::Format(format.c_str(), min, max).Data();
108  }
109  else if (mode == "min") {
110  double min = point->GetBinMin(axisName);
111  objPath += TString::Format(format.c_str(), min).Data();
112  }
113  else if (mode == "max") {
114  double max = point->GetBinMax(axisName);
115  objPath += TString::Format(format.c_str(), max).Data();
116  }
117  else if (mode == "center") {
118  double c = point->GetBinCenter(axisName);
119  objPath += TString::Format(format.c_str(), c).Data();
120  }
121  else if (mode == "label") {
122  std::string lbl = point->GetBinLabel(axisName);
123  objPath += lbl;
124  }
125  else if (mode == "bin") {
126  objPath += std::to_string(point->GetBin(axisName));
127  }
128  else {
129  objPath += std::to_string(point->GetBin(axisName));
130  }
131 
132  std::string sep = axisDefaultSeparator;
133  if (axisEntry.is_object() && axisEntry.contains("sufix") && axisEntry["sufix"].is_string()) {
134  sep = axisEntry["sufix"].get<std::string>();
135  }
136  objPath += sep;
137  lastSep = sep;
138  }
139 
140  if (!lastSep.empty() && objPath.size() >= lastSep.size()) {
141  objPath = objPath.substr(0, objPath.size() - lastSep.size());
142  }
143  if (objCfg.contains("sufix") && objCfg["sufix"].is_string()) {
144  objPath += objCfg["sufix"].get<std::string>();
145  }
146 
147  return objPath;
148 }
157 NGnTree::NGnTree() : TObject() {}
158 
159 NGnTree::NGnTree(std::vector<TAxis *> axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
160 {
164  if (axes.empty()) {
165  NLogError("NGnTree::NGnTree: No axes provided, binning is nullptr.");
166  // fBinning = new NBinning();
167  MakeZombie();
168  return;
169  }
170  fBinning = new NBinning(axes);
172  fTreeStorage->InitTree(filename, treename);
173  fNavigator = new NGnNavigator();
174  fNavigator->SetGnTree(this);
175 }
176 NGnTree::NGnTree(TObjArray * axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
177 {
181 
182  if (axes == nullptr) {
183  NLogError("NGnTree::NGnTree: Axes TObjArray is nullptr.");
184  MakeZombie();
185  return;
186  }
187 
188  if (axes == nullptr && axes->GetEntries() == 0) {
189  NLogError("NGnTree::NGnTree: No axes provided, binning is nullptr.");
190  MakeZombie();
191  return;
192  }
193  fBinning = new NBinning(axes);
195  fTreeStorage->InitTree(filename, treename);
196  fNavigator = new NGnNavigator();
197  fNavigator->SetGnTree(this);
198 }
199 
200 NGnTree::NGnTree(NGnTree * ngnt, std::string filename, std::string treename) : TObject(), fInput(nullptr)
201 {
205  if (ngnt == nullptr) {
206  NLogError("NGnTree::NGnTree: NGnTree is nullptr.");
207  MakeZombie();
208  return;
209  }
210 
211  if (ngnt->GetBinning() == nullptr) {
212  NLogError("NGnTree::NGnTree: Binning in NGnTree is nullptr.");
213  MakeZombie();
214  return;
215  }
216 
217  // TODO: Import binning from user
218  fBinning = (NBinning *)ngnt->GetBinning()->Clone();
220  fTreeStorage->InitTree(filename, treename);
221  fNavigator = new NGnNavigator();
222  fNavigator->SetGnTree(this);
223 }
224 
226  : TObject(), fBinning(b), fTreeStorage(s), fInput(nullptr), fOwnsBinning(false), fOwnsTreeStorage(false)
227 {
231  if (fBinning == nullptr) {
232  NLogError("NGnTree::NGnTree: Binning is nullptr.");
233  MakeZombie();
234  }
235  if (s == nullptr) {
237  fTreeStorage->InitTree("", "ngnt");
238  fOwnsTreeStorage = true;
239  }
240 
241  if (fTreeStorage == nullptr) {
242  NLogError("NGnTree::NGnTree: Storage tree is nullptr.");
243  MakeZombie();
244  }
245 
246  // fBinning->Initialize();
247  //
248  // TODO: Check if this is needed
251  fNavigator = new NGnNavigator();
252  fNavigator->SetGnTree(this);
253 }
254 NGnTree::NGnTree(THnSparse * hns, std::string parameterAxis, const std::string & outFileName, json cfg)
255  : TObject(), fInput(nullptr)
256 {
260 
261  std::map<std::string, std::vector<std::vector<int>>> b;
262  TObjArray * axes = new TObjArray();
263  int parameterAxisIdx = -1;
264  std::vector<std::string> labels;
265  for (int i = 0; i < hns->GetNdimensions(); i++) {
266  TAxis * axisIn = (TAxis *)hns->GetAxis(i);
267  TAxis * axis = (TAxis *)axisIn->Clone();
268 
269  // check if parameterAxis matches axis name
270  if (parameterAxis.compare(axis->GetName()) == 0) {
271  parameterAxisIdx = i;
272  TAxis * axis = hns->GetAxis(parameterAxisIdx);
273  for (int bin = 1; bin <= axis->GetNbins(); bin++) {
274  // NLogInfo("Axis bin %d label: %s", bin, axis->GetBinLabel(bin));
275  labels.push_back(axis->GetBinLabel(bin));
276  }
277  continue;
278  }
279 
280  // set label
281  if (axisIn->IsAlphanumeric()) {
282  NLogTrace("Setting axis '%s' labels from input THnSparse", axis->GetName());
283  for (int bin = 1; bin <= axisIn->GetNbins(); bin++) {
284  std::string label = axisIn->GetBinLabel(bin);
285  if (!labels.empty()) axis->SetBinLabel(bin, axisIn->GetBinLabel(bin));
286  }
287  }
288 
289  axes->Add(axis);
290  b[axis->GetName()] = {{1}};
291  }
292 
293  // json cfg;
294  cfg["_parameterAxis"] = parameterAxisIdx;
295  cfg["_labels"] = labels;
296 
297  // return nullptr;
298  NLogDebug("Importing THnSparse as NGnTree with parameter axis '%s' (index %d) ...", parameterAxis.c_str(),
299  parameterAxisIdx);
300  NGnTree * ngnt = new NGnTree(axes, outFileName);
301  NLogDebug("Created NGnTree for THnSparse import ...");
302  if (ngnt->IsZombie()) {
303  NLogError("NGnTree::Import: Failed to create NGnTree !!!");
304  MakeZombie();
305  return;
306  }
307  // ngnt->GetStorageTree()->GetTree()->GetUserInfo()->Add(hns->Clone());
308  // Get env variable for tmp dir $NDMSPC_TMP_DIR
309  const char * tmpDirStr = gSystem->Getenv("NDMSPC_TMP_DIR");
310 
311  std::string tmpDir;
312 
313  if (!tmpDirStr || tmpDir.empty()) {
314  tmpDir = "/tmp";
315  }
316  std::string tmpFilename = tmpDir + "/ngnt_imported_input" + std::to_string(gSystem->GetPid()) + ".root";
317  NGnTree * ngntIn = new NGnTree(axes, tmpFilename);
318  if (ngntIn->IsZombie()) {
319  NLogError("NGnTree::Import: Failed to create NGnTree for input !!!");
320  SafeDelete(ngnt);
321  MakeZombie();
322  return;
323  }
324  // ngntIn->GetStorageTree()->GetTree()->GetUserInfo()->Add(hns->Clone());
325  ngntIn->GetOutput("default")->Add(hns->Clone("test"));
326  ngntIn->Close(true);
327 
328  // return;
329  // delete ngntIn;
330 
331  ngnt->SetInput(NGnTree::Open(tmpFilename)); // Set input to self
332 
333  ngnt->GetInput()->Print();
334 
335  ngnt->GetBinning()->AddBinningDefinition("default", b);
336  ngnt->InitParameters(cfg["_labels"].get<std::vector<std::string>>());
337 
338  Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * /*output*/, TList * outputPoint,
339  int /*threadId*/) {
340  // NLogInfo("Thread ID: %d", threadId);
341  TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
342  // point->Print();
343  json cfg = point->GetCfg();
344 
345  NGnTree * ngntIn = point->GetInput();
346  if (!ngntIn) {
347  NLogError("NGnTree::Import: Input NGnTree is nullptr !!!");
348  return;
349  }
350  // ngntIn->Print();
351 
352  THnSparse * hns = (THnSparse *)ngntIn->GetOutput("default")->At(0);
353  if (hns == nullptr) {
354  NLogError("NGnTree::Import: THnSparse 'hns' not found in storage tree !!!");
355  return;
356  }
357 
358  int axisIdx = cfg["_parameterAxis"].get<int>();
359  std::vector<std::vector<int>> ranges;
360  // set ranges from point storage coords
361  int iAxis = 0;
362  for (int i = 0; i < hns->GetNdimensions(); i++) {
363  if (i == axisIdx) continue; // skip parameter axis
364  int coord = point->GetStorageCoords()[iAxis++];
365  ranges.push_back({i, coord, coord});
366  // NLogInfo("Setting axis %d range to [%d, %d]", i, coord, coord);
367  }
368 
369  NUtils::SetAxisRanges(hns, ranges);
370  TH1 * h = hns->Projection(axisIdx, "O");
371  if (!h) {
372  NLogError("NGnTree::Import: Projection of THnSparse failed !!!");
373  return;
374  }
375  if (h->GetEntries() > 0) {
376  NParameters * params = point->GetParameters();
377  if (params) {
378  for (int bin = 1; bin <= h->GetNbinsX(); bin++) {
379  params->SetParameter(bin, h->GetBinContent(bin), h->GetBinError(bin));
380  }
381  }
382  // outputPoint->Add(hParams);
383  outputPoint->Add(h);
384 
385  std::string filename = cfg["filename"].get<std::string>();
386  TFile * f = (TFile *)point->GetTempObject("file");
387  if (!f || filename.compare(f->GetName()) != 0) {
388  if (f) {
389  NLogDebug("NGnTree::Import: Closing previously opened file '%s' ...", f->GetName());
390  f->Close();
391  }
392  NLogDebug("NGnTree::Import: Opening file '%s' ...", filename.c_str());
393  f = TFile::Open(filename.c_str());
394  if (!f || f->IsZombie()) {
395  NLogError("NGnTree::Import: Cannot open file '%s' !!!", filename.c_str());
396  return;
397  }
398  point->SetTempObject("file", f);
399  }
400 
401  std::string axisObjectDefaultFormat =
402  cfg["axisObjectDefaultFormat"].is_string() ? cfg["axisObjectDefaultFormat"].get<std::string>() : "%.2f_%.2f";
403  std::string axisDefaultSeparator =
404  cfg["axisDefaultSeparator"].is_string() ? cfg["axisDefaultSeparator"].get<std::string>() : "/";
405  bool dryrun = false;
406  if (cfg.contains("dryrun") && cfg["dryrun"].is_boolean()) {
407  dryrun = cfg["dryrun"].get<bool>();
408  }
409 
410  if (dryrun) {
411  NLogInfo("NGnTree::Import (dryrun): '%s' ...", point->GetString().c_str());
412  }
413 
414  // loop over all object in cfg["objects"]
415  for (auto & [objName, objCfg] : cfg["objects"].items()) {
416  std::string objPath = NGnTree::BuildObjectPath(cfg, objCfg, point);
417 
418  if (dryrun) {
419  NLogInfo("NGnTree::Import (dryrun): would retrieve object '%s'", objPath.c_str());
420  continue;
421  }
422 
423  if (point->GetEntryNumber() == 0) {
424  NLogInfo("NGnTree::Import: Retrieving object '%s' from file '%s' ...", objPath.c_str(),
425  cfg["filename"].get<std::string>().c_str());
426  }
427  TObject * obj = f->Get(objPath.c_str());
428  if (!obj) {
429  if (point->GetEntryNumber() == 0) {
430  NLogWarning("NGnTree::Import: Cannot get object '%s' from file '%s' !!!", objPath.c_str(),
431  cfg["filename"].get<std::string>().c_str());
432  }
433  continue;
434  }
435 
436  if (obj->InheritsFrom(TCanvas::Class())) {
437  TCanvas * cObj = (TCanvas *)obj;
438  cObj->SetName(objName.c_str());
439  }
440  outputPoint->Add(obj->Clone(objName.c_str()));
441  }
442  // f->Close();
443  }
444  };
445 
446  // NUtils::SetAxisRanges(, std::vector<std::vector<int>> ranges)
447  ngnt->Process(processFunc, cfg);
448  ngnt->Close(true);
449  // Remove tmp file
450  gSystem->Exec(TString::Format("rm -f %s", tmpFilename.c_str()));
451 }
452 
454 {
458 
459  if (fOwnsBinning) {
460  SafeDelete(fBinning);
461  }
462  if (fOwnsTreeStorage) {
463  SafeDelete(fTreeStorage);
464  }
465  SafeDelete(fNavigator);
466  SafeDelete(fParameters);
467 }
468 void NGnTree::Print(Option_t * option) const
469 {
473 
474  TString opt = option;
475 
476  // Print list of axes
477  NLogInfo("NGnTree::Print: Printing NGnTree object [ALL] ...");
478  if (fBinning) {
479  fBinning->Print(option);
480  }
481  else {
482  NLogError("Binning is not initialized in NGnTree !!!");
483  }
484  if (fTreeStorage) {
485  fTreeStorage->Print(option);
486  }
487  else {
488  NLogError("Storage tree is not initialized in NGnTree !!!");
489  }
490 }
491 
492 void NGnTree::Draw(Option_t * /*option*/)
493 {
497 
498  NLogInfo("NGnTree::Draw: Drawing NGnTree object [not implemented yet]...");
499 }
500 
501 bool NGnTree::Process(NGnProcessFuncPtr func, const json & cfg, std::string binningName, NGnBeginFuncPtr beginFunc,
502  NGnEndFuncPtr endFunc)
503 {
507 
508  if (!fBinning) {
509  NLogError("Binning is not initialized in NGnTree !!!");
510  return false;
511  }
512 
513  NBinning * binningIn = (NBinning *)fBinning->Clone();
514 
515  std::vector<std::string> defNames = fBinning->GetDefinitionNames();
516  if (!binningName.empty()) {
517  // Check if binning definitions exist
518  if (std::find(defNames.begin(), defNames.end(), binningName) == defNames.end()) {
519  NLogError("Binning definition '%s' not found in NGnTree !!!", binningName.c_str());
520  return false;
521  }
522  defNames.clear();
523  defNames.push_back(binningName);
524  }
525 
526  fBinning->Reset();
527  fBinning->SetCfg(cfg); // Set configuration to binning point
528  bool rc = Process(func, defNames, cfg, binningIn, beginFunc, endFunc);
529  if (!rc) {
530  NLogError("NGnTree::Process: Processing failed !!!");
531  return false;
532  }
533  // bool rc = false;
534  return true;
535 }
536 
537 bool NGnTree::Process(NGnProcessFuncPtr func, const std::vector<std::string> & defNames, const json & cfg,
538  NBinning * binningIn, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc)
539 {
543 
544  NLogInfo("NGnTree::Process: Starting processing with %zu definitions ...", defNames.size());
545  bool batch = gROOT->IsBatch();
546  gROOT->SetBatch(kTRUE);
547  TH1::AddDirectory(kFALSE);
548 
549  std::string storagePostfix = fTreeStorage ? fTreeStorage->GetPostfix() : "";
550  if (storagePostfix.empty() && fTreeStorage) {
551  const std::string storageFileName = fTreeStorage->GetFileName();
552  if (!storageFileName.empty()) {
553  storagePostfix = gSystem->BaseName(storageFileName.c_str());
554  }
555  }
556  if (storagePostfix.empty()) {
557  storagePostfix = "ndmspc.root";
558  }
559 
560  // --- Worker mode: run as a remote TCP worker ---
561  if (const char * workerEndpoint = gSystem->Getenv("NDMSPC_WORKER_ENDPOINT")) {
562  size_t workerIndex = 0;
563  if (const char * envIdx = gSystem->Getenv("NDMSPC_WORKER_INDEX")) {
564  try { workerIndex = static_cast<size_t>(std::stoul(envIdx)); } catch (...) {}
565  }
566  NLogInfo("NGnTree::Process: Worker mode — connecting to %s as worker %zu", workerEndpoint, workerIndex);
567 
568  // const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
569  // jobDir and treeName will be received from master via INIT
570  // For now init NGnThreadData with a placeholder filename; Init() will be called with real paths
571  Ndmspc::NGnThreadData workerData;
572 
573  void * ctx = zmq_ctx_new();
574  void * dealer = zmq_socket(ctx, ZMQ_DEALER);
575  const std::string identity = Ndmspc::NDimensionalIpcRunner::BuildWorkerIdentity(workerIndex);
576  zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size());
577  int timeoutMs = 1000;
578  zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
579  zmq_connect(dealer, workerEndpoint);
580  Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"READY"});
581 
582  // Wait for INIT
583  const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(30);
584  bool initOk = false;
585  while (!initOk) {
586  std::vector<std::string> frames;
587  if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
588  if (errno == EAGAIN || errno == EWOULDBLOCK) {
589  if (std::chrono::steady_clock::now() > initDeadline) break;
590  continue;
591  }
592  break;
593  }
594  // INIT frames: "INIT", workerIdx, sessionId, resultsDir, treeName[, tmpDir, tmpResultsDir]
595  if (frames.size() >= 1 && frames[0] == "STOP") {
596  NLogPrint("NGnTree::Process: Worker received STOP before INIT — session already finished, exiting.");
597  zmq_close(dealer);
598  zmq_ctx_term(ctx);
599  gROOT->SetBatch(batch);
600  return true;
601  }
602  if (frames.size() >= 5 && frames[0] == "INIT") {
603  workerIndex = static_cast<size_t>(std::stoul(frames[1]));
604  const std::string & sessionId = frames[2];
605  const std::string & initResultsDir = frames[3];
606  const std::string & initTreeName = frames[4];
607 
608  // Apply env vars sent by supervisor — these override the worker's inherited environment
609  if (frames.size() >= 7) {
610  if (!frames[5].empty())
611  gSystem->Setenv("NDMSPC_TMP_DIR", frames[5].c_str());
612  if (!frames[6].empty())
613  gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", frames[6].c_str());
614  }
615  // Fallback: if NDMSPC_TMP_RESULTS_DIR is still unset/empty, use NDMSPC_TMP_DIR
616  if (!gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR")[0] == '\0') {
617  const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
618  if (tmpDirEnv && tmpDirEnv[0] != '\0')
619  gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
620  }
621 
622  // Local work file — always on this machine's NDMSPC_TMP_DIR
623  const char * localTmpEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
624  const std::string localBase = localTmpEnv ? localTmpEnv : "/tmp";
625  const std::string localFile = localBase + "/.ndmspc/tmp/" + sessionId + "/" +
626  std::to_string(workerIndex) + "/" + storagePostfix;
627 
628  // Results file — on shared FS; supervisor reads from here to merge
629  const std::string resultsFile = initResultsDir + "/" + std::to_string(workerIndex) + "/" +
630  storagePostfix;
631 
632  bool rc = workerData.Init(workerIndex, func, beginFunc, endFunc, this, binningIn, fInput, localFile, initTreeName);
633  if (!rc) {
634  NLogError("NGnTree::Process: Worker failed to initialize NGnThreadData");
635  zmq_close(dealer);
636  zmq_ctx_term(ctx);
637  return false;
638  }
639  // If results dir differs from local dir, tell TaskLoop to copy after Close(true)
640  if (resultsFile != localFile) {
641  workerData.SetResultsFilename(resultsFile);
642  }
643  workerData.SetCfg(cfg);
644  Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"ACK"});
645  initOk = true;
646  }
647  }
648  if (!initOk) {
649  NLogError("NGnTree::Process: Worker did not receive INIT from supervisor");
650  zmq_close(dealer);
651  zmq_ctx_term(ctx);
652  return false;
653  }
654 
655  Ndmspc::NDimensionalIpcRunner::TaskLoop(dealer, workerIndex, &workerData);
656  zmq_close(dealer);
657  zmq_ctx_term(ctx);
658  gROOT->SetBatch(batch);
659  return true;
660  }
661  // --- End worker mode ---
662 
664 
665  int nThreads = ROOT::GetThreadPoolSize(); // Get the number of threads to use
666  if (nThreads < 1) nThreads = 1;
667 
668  std::string executionMode = "thread";
669  const char * envMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE");
670  const bool modeExplicit = (envMode && envMode[0] != '\0');
671  if (modeExplicit) {
672  executionMode = envMode;
673  }
674 
675  std::string normalizedMode = executionMode;
676  std::transform(normalizedMode.begin(), normalizedMode.end(), normalizedMode.begin(),
677  [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
678  if (normalizedMode == "process") normalizedMode = "ipc";
679 
680  bool useProcessIpc = (normalizedMode == "ipc" || normalizedMode == "tcp");
681  bool useTcp = (normalizedMode == "tcp");
682  size_t nProcesses = static_cast<size_t>(nThreads);
683  bool ndmspcNProcExplicit = false;
684 
685  if (const char * envNdmspcNProc = gSystem->Getenv("NDMSPC_MAX_PROCESSES")) {
686  ndmspcNProcExplicit = true;
687  try {
688  nProcesses = std::max<size_t>(1, static_cast<size_t>(std::stoll(envNdmspcNProc)));
689  }
690  catch (...) {
691  NLogWarning("NGnTree::Process: Invalid NDMSPC_MAX_PROCESSES='%s', using default=%zu", envNdmspcNProc,
692  nProcesses);
693  }
694  }
695  else if (const char * envNProc = gSystem->Getenv("ROOT_MAX_THREADS")) {
696  // Backward-compatible fallback when NDMSPC_MAX_PROCESSES is not set.
697  try {
698  nProcesses = std::max<size_t>(1, static_cast<size_t>(std::stoll(envNProc)));
699  }
700  catch (...) {
701  NLogWarning("NGnTree::Process: Invalid ROOT_MAX_THREADS='%s', using default=%zu", envNProc, nProcesses);
702  }
703  }
704 
705  // Keep explicit NDMSPC_EXECUTION_MODE settings authoritative.
706  // If mode is not explicitly set, default to local IPC for multi-process runs.
707  if (modeExplicit) {
708  if (normalizedMode == "thread") {
709  useProcessIpc = false;
710  useTcp = false;
711  }
712  else if (normalizedMode == "tcp") {
713  useProcessIpc = true;
714  useTcp = true;
715  }
716  else if (normalizedMode == "ipc") {
717  useProcessIpc = true;
718  useTcp = false;
719  }
720  else {
721  NLogWarning("NGnTree::Process: Unknown NDMSPC_EXECUTION_MODE='%s', falling back to auto mode selection.",
722  executionMode.c_str());
723  useProcessIpc = (nProcesses > 1);
724  useTcp = false;
725  }
726  }
727  else if (nProcesses > 1) {
728  useProcessIpc = true;
729  useTcp = false;
730  executionMode = "ipc";
731  normalizedMode = "ipc";
732  }
733 
734  if (ndmspcNProcExplicit && normalizedMode == "thread" && nProcesses > 1) {
735  NLogWarning("NGnTree::Process: NDMSPC_MAX_PROCESSES=%zu is set, but NDMSPC_EXECUTION_MODE=thread disables IPC.",
736  nProcesses);
737  }
738 
739  const size_t workerObjectCount = useProcessIpc ? std::max(static_cast<size_t>(nThreads), nProcesses)
740  : static_cast<size_t>(nThreads);
741  std::vector<Ndmspc::NGnThreadData> threadDataVector(workerObjectCount);
742 
743  NLogInfo("NGnTree::Process: executionMode='%s', useProcessIpc=%d, ROOT threads=%d, ipcProcesses=%zu, workerObjects=%zu",
744  executionMode.c_str(), useProcessIpc ? 1 : 0, nThreads, nProcesses, workerObjectCount);
745 
746  const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
747  std::string tmpDir;
748  if (tmpDirEnv && tmpDirEnv[0] != '\0') {
749  tmpDir = tmpDirEnv;
750  } else {
751  TString tmpDirPrefix = fTreeStorage->GetPrefix();
752  // Use storage prefix only if it is a local path (not a remote URL)
753  if (!(tmpDirPrefix.BeginsWith("root://") || tmpDirPrefix.BeginsWith("http://") ||
754  tmpDirPrefix.BeginsWith("https://"))) {
755  tmpDir = tmpDirPrefix.Data();
756  }
757  if (tmpDir.empty()) tmpDir = "/tmp";
758  }
759 
760  std::string jobDir = tmpDir + "/.ndmspc/tmp/" + std::to_string(gSystem->GetPid());
761 
762  // Results dir: when NDMSPC_TMP_RESULTS_DIR equals NDMSPC_TMP_DIR (or is unset),
763  // reuse jobDir so that localTmpFile == resultsFilename — no copy or delete needed.
764  const char * resultsDirEnv = gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR");
765  const bool sameDir = !resultsDirEnv || std::string(resultsDirEnv) == tmpDir;
766  std::string resultsDir = sameDir ? jobDir
767  : (std::string(resultsDirEnv) + "/" +
768  std::to_string(gSystem->GetPid()));
769 
770  std::string filePrefix = jobDir;
771  for (size_t i = 0; i < threadDataVector.size(); ++i) {
772  std::string filename = filePrefix + "/" + std::to_string(i) + "/" + storagePostfix;
773  bool rc = threadDataVector[i].Init(i, func, beginFunc, endFunc, this, binningIn, fInput, filename,
774  fTreeStorage->GetTree()->GetName());
775  if (!rc) {
776  NLogError("Failed to initialize thread data %zu, exiting ...", i);
777  return false;
778  }
779  threadDataVector[i].SetCfg(cfg); // Set configuration to binning point
780  if (useTcp) {
781  // Tell the merge step where workers will deposit their finished files.
782  // When resultsDir == jobDir (NDMSPC_TMP_RESULTS_DIR unset) the paths are
783  // identical so no copy or delete is needed — handled in TaskLoop.
784  std::string resultsFile = resultsDir + "/" + std::to_string(i) + "/" + storagePostfix;
785  threadDataVector[i].SetResultsFilename(resultsFile);
786  }
787  }
788  size_t processedEntries = 0;
789  size_t totalEntries = 0;
790  auto start_par = std::chrono::high_resolution_clock::now();
791  auto start_par_job = std::chrono::high_resolution_clock::now();
792  auto task = [&](const std::vector<int> & coords, Ndmspc::NGnThreadData & thread_obj) {
793  // NLogWarning("Processing coordinates %s in thread %zu", NUtils::GetCoordsString(coords).c_str(),
794  // thread_obj.GetAssignedIndex());
795  // thread_obj.Print();
796  thread_obj.Process(coords);
797  processedEntries++;
798  if (!NLogger::GetConsoleOutput()) {
799  size_t nRunning = (totalEntries - processedEntries >= threadDataVector.size()) ? threadDataVector.size()
800  : totalEntries - processedEntries;
801  NUtils::ProgressBar(processedEntries, totalEntries, start_par, TString::Format("R%4zu", nRunning).Data());
802  }
803  };
804 
805  size_t iDef = 0;
806  int sumIds = 0;
807 
808  std::vector<Ndmspc::NThreadData *> processWorkers;
809  std::unique_ptr<Ndmspc::NDimensionalExecutor> ipcExecutor;
810  if (useProcessIpc) {
811  processWorkers.reserve(threadDataVector.size());
812  for (size_t i = 0; i < threadDataVector.size(); ++i) {
813  processWorkers.push_back(&threadDataVector[i]);
814  }
815  ipcExecutor = std::make_unique<Ndmspc::NDimensionalExecutor>(std::vector<int>{0}, std::vector<int>{0});
816  if (useTcp) {
817  const char * tcpPort = gSystem->Getenv("NDMSPC_TCP_PORT");
818  std::string tcpEndpoint = std::string("tcp://0.0.0.0:") + (tcpPort ? tcpPort : "5555");
819  const char * resultsDirBase = gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR");
820  const char * macroParams = gSystem->Getenv("NDMSPC_MACRO_PARAMS");
821  // Auto-detect the macro to send to workers: explicit SetWorkerMacro() takes
822  // priority; otherwise fall back to NDMSPC_MACRO set by ndmspc-run.
823  std::string workerMacro = fWorkerMacroList;
824  if (workerMacro.empty()) {
825  if (const char * envMacro = gSystem->Getenv("NDMSPC_MACRO")) workerMacro = envMacro;
826  }
827  ipcExecutor->StartProcessIpc(processWorkers, nProcesses, tcpEndpoint, resultsDir,
828  fTreeStorage->GetTree()->GetName(), workerMacro, tmpDir,
829  resultsDirBase ? resultsDirBase : "",
830  macroParams ? macroParams : "");
831  } else {
832  ipcExecutor->StartProcessIpc(processWorkers, nProcesses);
833  }
834  }
835 
836  std::map<std::string, std::vector<Long64_t>>
837  defIdMapProcessedRemoved; // Map to track which ids belong to which definition
838  // for (auto & name : defNames) {
839  // auto binningDef = binningIn->GetDefinition(name);
840  // if (!binningDef) {
841  // NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
842  // return false;
843  // }
844  // for (auto & id : binningDef->GetIds()) {
845  // defIdMap[name].push_back(id);
846  // }
847  // }
848 
849  try {
850  for (auto & name : defNames) {
851  auto binningDef = binningIn->GetDefinition(name);
852  if (!binningDef) {
853  NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
854  return false;
855  }
856 
857  if (binningDef->GetIds().size() == 0) {
858  NLogWarning("NGnTree::Process: Binning definition '%s' has no entries, skipping ...", name.c_str());
859  continue;
860  }
861 
862  const std::vector<Long64_t> originalDefinitionIds = binningDef->GetIds();
863 
864  std::vector<int> mins, maxs;
865  mins.push_back(0);
866  maxs.push_back(binningDef->GetIds().size() - 1);
867  NLogDebug("NGnTree::Process: Processing with binning definition '%s' with %zu entries", name.c_str(),
868  binningDef->GetIds().size());
869 
871  NLogInfo("NGnTree::Process: Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
872  }
873  else {
874  Printf("Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
875  }
876  start_par = std::chrono::high_resolution_clock::now();
877  processedEntries = 0;
878  totalEntries = maxs[0] + 1;
879  const size_t activeWorkers = useProcessIpc ? std::max<size_t>(1, std::min(nProcesses, processWorkers.size()))
880  : threadDataVector.size();
882  NUtils::ProgressBar(processedEntries, totalEntries, start_par,
883  TString::Format("R%4zu", activeWorkers).Data());
884 
885  // binningIn->SetCurrentDefinitionName(name);
886  for (size_t i = 0; i < threadDataVector.size(); ++i) {
887  // threadDataVector[i].SetCurrentDefinitionName(name);
888  threadDataVector[i].GetHnSparseBase()->GetBinning()->SetCurrentDefinitionName(name);
889  }
890 
891  Ndmspc::NDimensionalExecutor executorMT(mins, maxs);
892 
893  if (!useProcessIpc) {
894  // Disable ROOT's RecursiveRemove during the parallel phase.
895  // Without this, concurrent threads' object deletions trigger RecursiveRemove
896  // which iterates pad->fPrimitives without per-object locks → TObjLink corruption.
897  Bool_t prevMustClean = gROOT->MustClean();
898  gROOT->SetMustClean(kFALSE);
899 
900  // Enable batch mode during the parallel phase so that ROOT drawing calls
901  // (e.g. TH1::Fit drawing the fit function via f1->Draw) are no-ops.
902  // Without this, concurrent threads each trigger ROOT canvas creation ("c1"),
903  // the second canvas creation deletes the first → double-free / heap corruption,
904  // which then causes TPad::RecursiveRemove to crash when closing the per-thread TTree.
905  Bool_t prevBatch = gROOT->IsBatch();
906  gROOT->SetBatch(kTRUE);
907 
908  executorMT.ExecuteParallel<Ndmspc::NGnThreadData>(task, threadDataVector);
909 
910  // Restore both flags before flushing deferred deletes, so each object's destructor
911  // properly calls gROOT->RecursiveRemove and removes itself from ROOT's global lists.
912  // This prevents dangling pointers that would crash the RecursiveRemove cascade
913  // triggered by TTree::~TTree when the per-thread storage files are closed below.
914  // It is safe here because all worker threads have already finished.
915  gROOT->SetMustClean(prevMustClean);
916  gROOT->SetBatch(prevBatch);
917 
918  // Flush deferred deletes single-threaded with MustClean and batch mode restored.
919  for (size_t i = 0; i < threadDataVector.size(); ++i) {
920  threadDataVector[i].FlushDeferredDeletes();
921  }
922 
923  for (size_t i = 0; i < threadDataVector.size(); ++i) {
924  threadDataVector[i].ExecuteEndFunction();
925  }
926  }
927  else {
928  ipcExecutor->SetBounds(mins, maxs);
929  // Capture final active worker count reported by the IPC executor so
930  // we can deterministically rebuild per-worker counters for only the
931  // workers that actually connected.
932  size_t finalActiveWorkers = 0;
933  size_t acked = ipcExecutor->ExecuteCurrentBoundsProcessIpc(
934  name, &originalDefinitionIds,
935  [&, activeWorkers](const ExecutionProgress& progress) {
936  processedEntries = progress.tasksAcked;
937  finalActiveWorkers = progress.activeWorkers;
938  if (!NLogger::GetConsoleOutput()) {
939  size_t nRunning = std::min(progress.activeWorkers, activeWorkers);
940  NUtils::ProgressBar(processedEntries, totalEntries, start_par,
941  TString::Format("R%4zu", nRunning).Data());
942  }
943  });
944  processedEntries = acked;
945 
946  // Child processes update their own worker-object copies. Rebuild parent-side
947  // per-worker counters and processed-id vectors deterministically from task assignment.
948  // Use the number of workers that actually connected (finalActiveWorkers) if available;
949  // otherwise fall back to the configured process count / processWorkers size.
950  size_t connected = finalActiveWorkers > 0 ? finalActiveWorkers : std::min(nProcesses, processWorkers.size());
951  const size_t processesToUse = std::max<size_t>(1, std::min(nProcesses, connected));
952  for (size_t i = 0; i < threadDataVector.size(); ++i) {
953  auto * workerDef = threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name);
954  if (workerDef) {
955  workerDef->GetIds().clear();
956  }
957  }
958 
959  for (size_t taskIndex = 0; taskIndex < originalDefinitionIds.size(); ++taskIndex) {
960  const size_t workerIndex = taskIndex % processesToUse;
961  threadDataVector[workerIndex].SetNProcessed(threadDataVector[workerIndex].GetNProcessed() + 1);
962 
963  auto * workerDef = threadDataVector[workerIndex].GetHnSparseBase()->GetBinning()->GetDefinition(name);
964  if (workerDef) {
965  workerDef->GetIds().push_back(originalDefinitionIds[taskIndex]);
966  }
967  }
968 
969  if (!NLogger::GetConsoleOutput() && processedEntries < totalEntries) {
970  NUtils::ProgressBar(processedEntries, totalEntries, start_par, "R 0");
971  }
972  }
973 
975  Printf("Finished processing binning definition '%s'. Post-processing results ...", name.c_str());
976  // Update hnsbBinningIn with the processed ids
977  NLogDebug("NGnTree::Process: [BEGIN] ------------------------------------------------");
978  sumIds += binningIn->GetDefinition(name)->GetIds().size();
979  binningIn->GetDefinition(name)->GetIds().clear();
980  for (size_t i = 0; i < threadDataVector.size(); ++i) {
981  NLogDebug("NGnTree::Process: -> Thread %zu processed %lld entries", i, threadDataVector[i].GetNProcessed());
982  // threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->Print();
983  binningIn->GetDefinition(name)->GetIds().insert(
984  binningIn->GetDefinition(name)->GetIds().end(),
985  threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().begin(),
986  threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().end());
987  sort(binningIn->GetDefinition(name)->GetIds().begin(), binningIn->GetDefinition(name)->GetIds().end());
988  }
989  // hnsbBinningIn->GetDefinition(name)->Print();
990  // remove entries present in hnsbBinningIn from other definitions
991  for (size_t i = 0; i < defNames.size(); i++) {
992 
993  std::string other_name = defNames[i];
994  auto otherDef = binningIn->GetDefinition(other_name);
995  if (i <= iDef) {
996  continue;
997  }
998  if (!otherDef) {
999  NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", other_name.c_str());
1000  return false;
1001  }
1002  // remove entries that has value less then sumIds
1003  for (auto it = otherDef->GetIds().begin(); it != otherDef->GetIds().end();) {
1004  NLogTrace("NGnTree::Process: Checking entry %lld from definition '%s' against sumIds=%d", *it,
1005  other_name.c_str(), sumIds);
1006  if (*it < sumIds) {
1007  NLogTrace("NGnTree::Process: Removing entry %lld from definition '%s'", *it, other_name.c_str());
1008  defIdMapProcessedRemoved[other_name].push_back(*it);
1009  it = otherDef->GetIds().erase(it);
1010  }
1011  else {
1012  ++it;
1013  }
1014  }
1015 
1016  binningIn->GetDefinition(other_name)->Print();
1017  }
1018  // hnsbBinningIn->GetDefinition(name)->Print();
1019  iDef++;
1020 
1021  NLogDebug("NGnTree::Process: [END] ------------------------------------------------");
1022  }
1023  }
1024  catch (const std::exception & ex) {
1025  if (ipcExecutor) {
1026  ipcExecutor->FinishProcessIpc(/*abort=*/true);
1027  ipcExecutor.reset();
1028  }
1029 
1030  TString what(ex.what());
1031  if (what.Contains("Interrupted by user")) {
1032  if (gROOT) {
1033  gROOT->SetInterrupt(kFALSE);
1034  }
1035  NLogWarning("NGnTree::Process: Interrupted by user, stopping processing.");
1036  }
1037  else {
1038  NLogError("NGnTree::Process: Processing failed: %s", ex.what());
1039  }
1040  return false;
1041  }
1042 
1043  // return true; // For testing, skip merging and post-processing
1044 
1045  auto end_par = std::chrono::high_resolution_clock::now();
1046  std::chrono::duration<double, std::milli> par_duration = end_par - start_par_job;
1047 
1048  if (ipcExecutor) {
1049  ipcExecutor->FinishProcessIpc();
1050  }
1051 
1052  // For TCP mode, only merge results from workers that actually connected.
1053  // For IPC/fork and thread modes, all indices are valid.
1054  const std::set<size_t> registeredWorkers =
1055  (ipcExecutor && useTcp) ? ipcExecutor->GetRegisteredWorkerIndices() : std::set<size_t>{};
1056  const bool filterByRegistered = !registeredWorkers.empty();
1057 
1058  if (!NLogger::GetConsoleOutput()) {
1059  Printf("NGnTree::Process: Execution completed and it took %s .",
1060  NUtils::FormatTime(par_duration.count() / 1000).c_str());
1061  }
1062  else {
1063  NLogInfo("NGnTree::Process: Execution completed and it took %s .",
1064  NUtils::FormatTime(par_duration.count() / 1000).c_str());
1065  }
1066 
1067  NLogInfo("NGnTree::Process: Post processing %zu results ...", threadDataVector.size());
1068  for (auto & data : threadDataVector) {
1069  if (useProcessIpc) {
1070  NLogTrace("NGnTree::Process: Releasing parent handle for worker %zu file without writing",
1071  data.GetAssignedIndex());
1072  // data.GetHnSparseBase()->GetStorageTree()->Close(false);
1073  }
1074  else {
1075  NLogTrace("NGnTree::Process: Closing file from thread %zu with write", data.GetAssignedIndex());
1076  data.GetHnSparseBase()->GetStorageTree()->Close(true);
1077  }
1078  }
1079 
1080  NLogDebug("NGnTree::Process: Merging %zu results ...", threadDataVector.size());
1081  if (!NLogger::GetConsoleOutput()) {
1082  Printf("NGnTree::Process: [phase] merge start (%zu workers)", threadDataVector.size());
1083  }
1084  const auto mergeStart = std::chrono::high_resolution_clock::now();
1085  TList * mergeList = new TList();
1086  Ndmspc::NGnThreadData * outputData = new Ndmspc::NGnThreadData();
1087  outputData->Init(0, func, nullptr, nullptr, this, binningIn);
1088  outputData->SetCfg(cfg);
1089  // outputData->Init(0, func, this);
1090 
1091  for (auto & data : threadDataVector) {
1092  if (filterByRegistered && registeredWorkers.find(data.GetAssignedIndex()) == registeredWorkers.end()) {
1093  NLogInfo("NGnTree::Process: Skipping worker %zu — never connected", data.GetAssignedIndex());
1094  continue;
1095  }
1096  NLogTrace("NGnTree::Process: Adding thread data %zu to merge list ...", data.GetAssignedIndex());
1097  mergeList->Add(&data);
1098  }
1099 
1100  Long64_t nmerged = outputData->Merge(mergeList);
1101  const auto mergeEnd = std::chrono::high_resolution_clock::now();
1102  if (!NLogger::GetConsoleOutput()) {
1103  const auto mergeSec = std::chrono::duration_cast<std::chrono::duration<double>>(mergeEnd - mergeStart).count();
1104  Printf("NGnTree::Process: [phase] merge done (%lld outputs, %.2f s)", nmerged, mergeSec);
1105  }
1106  if (nmerged <= 0) {
1107  NLogError("NGnTree::Process: Failed to merge thread data, exiting ...");
1108  delete mergeList;
1109  return false;
1110  }
1111  NLogInfo("NGnTree::Process: Merged %lld outputs successfully", nmerged);
1112  // delete all temporary files
1113  // for (auto & data : threadDataVector) {
1114  // std::string filename = data.GetHnSparseBase()->GetStorageTree()->GetFileName();
1115  // NLogTrace("NGnTree::Process: Deleting temporary file '%s' ...", filename.c_str());
1116  // gSystem->Exec(TString::Format("rm -f %s", filename.c_str()));
1117  // }
1118  //
1119 
1120  // binningIn= outputData->GetHnSparseBase()->GetBinning();
1121 
1122  auto * mergedBinning = outputData->GetHnSparseBase()->GetBinning();
1123  std::set<Long64_t> mergedContentIds;
1124  std::vector<std::pair<Long64_t, std::vector<int>>> mergedContentCoords;
1125 
1126  // add missing entries to definitions based on defIdMapProcessedRemoved
1127  for (size_t i = 0; i < defNames.size(); i++) {
1128  std::string name = defNames[i];
1129  // auto def = binningIn->GetDefinition(name);
1130  auto def = mergedBinning->GetDefinition(name);
1131  if (!def) {
1132  NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1133  return false;
1134  }
1135  for (auto & [other_name, removedIds] : defIdMapProcessedRemoved) {
1136  if (other_name.compare(name) != 0) {
1137  continue;
1138  }
1139  for (auto & id : removedIds) {
1140  if (std::find(def->GetIds().begin(), def->GetIds().end(), id) == def->GetIds().end()) {
1141  NLogTrace("NGnTree::Process: Adding missing entry %lld to definition '%s'", id, name.c_str());
1142  def->GetIds().push_back(id);
1143  }
1144  }
1145  }
1146  sort(def->GetIds().begin(), def->GetIds().end());
1147  // outputData->GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds() = def->GetIds();
1148 
1149  // Modify content in binning definitions based on def->GetIds()
1150  def->GetContent()->Reset();
1151  for (auto id : def->GetIds()) {
1152  NBinningPoint point(def->GetBinning());
1153  def->GetBinning()->GetContent()->GetBinContent(id, point.GetCoords());
1154  point.RecalculateStorageCoords();
1155  Long64_t bin = def->GetContent()->GetBin(point.GetStorageCoords());
1156  NLogTrace("NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld", name.c_str(), id, bin);
1157  def->GetContent()->SetBinContent(bin, id);
1158 
1159  if (mergedContentIds.insert(id).second) {
1160  mergedContentCoords.emplace_back(
1161  id, NUtils::ArrayToVector(point.GetCoords(), point.GetNDimensionsContent()));
1162  }
1163  }
1164  }
1165 
1166  // Rebuild the merged top-level content from final definition ids. In IPC/TCP mode
1167  // the merge setup may still carry sparse source-bin content; resetting here keeps
1168  // only the bins that correspond to actual merged tree entries.
1169  mergedBinning->GetContent()->Reset();
1170  for (const auto & entry : mergedContentCoords) {
1171  Long64_t bin = mergedBinning->GetContent()->GetBin(entry.second.data());
1172  mergedBinning->GetContent()->SetBinContent(bin, entry.first);
1173  }
1174 
1175  // print final binning definitions
1176  NLogDebug("NGnTree::Process: Final binning definitions after processing:");
1177  for (auto & name : defNames) {
1178  // auto binningDef = binningIn->GetDefinition(name);
1179  auto binningDef = outputData->GetHnSparseBase()->GetBinning()->GetDefinition(name);
1180  if (!binningDef) {
1181  NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1182  return false;
1183  }
1184  binningDef->Print();
1185  }
1186 
1187  fTreeStorage = outputData->GetHnSparseBase()->GetStorageTree();
1188  fOutputs = outputData->GetHnSparseBase()->GetOutputs();
1189  fBinning = outputData->GetHnSparseBase()->GetBinning(); // Update binning to the merged one
1190  fParameters = outputData->GetHnSparseBase()->GetParameters();
1191 
1192  if (NLogger::GetConsoleOutput()) {
1193  NLogInfo("NGnTree::Process: Processing completed successfully. Output was stored in '%s'.",
1194  fTreeStorage->GetFileName().c_str());
1195  }
1196  else {
1197  Printf("Processing completed successfully. Output was stored in '%s'.", fTreeStorage->GetFileName().c_str());
1198  }
1199 
1200  // Close the final output file
1201  if (!NLogger::GetConsoleOutput()) {
1202  Printf("NGnTree::Process: [phase] final close start (%s)",
1203  outputData->GetHnSparseBase()->GetStorageTree()->GetFileName().c_str());
1204  }
1205  const auto closeStart = std::chrono::high_resolution_clock::now();
1206  outputData->GetHnSparseBase()->Close(true);
1207  const auto closeEnd = std::chrono::high_resolution_clock::now();
1208  if (!NLogger::GetConsoleOutput()) {
1209  const auto closeSec = std::chrono::duration_cast<std::chrono::duration<double>>(closeEnd - closeStart).count();
1210  Printf("NGnTree::Process: [phase] final close done (%.2f s)", closeSec);
1211  }
1212 
1213  if (!NLogger::GetConsoleOutput()) {
1214  Printf("NGnTree::Process: [phase] cleanup start (%s)", jobDir.c_str());
1215  }
1216  const auto cleanupStart = std::chrono::high_resolution_clock::now();
1217  gSystem->Exec(TString::Format("rm -fr %s", jobDir.c_str()));
1218  const auto cleanupEnd = std::chrono::high_resolution_clock::now();
1219  if (!NLogger::GetConsoleOutput()) {
1220  const auto cleanupSec = std::chrono::duration_cast<std::chrono::duration<double>>(cleanupEnd - cleanupStart).count();
1221  Printf("NGnTree::Process: [phase] cleanup done (%.2f s)", cleanupSec);
1222  }
1223  gROOT->SetBatch(batch); // Restore ROOT batch mode
1224  return true;
1225 }
1226 
1227 TList * NGnTree::GetOutput(std::string name)
1228 {
1232 
1233  if (name.empty()) {
1235  }
1236  if (fOutputs.find(name) == fOutputs.end()) {
1237  fOutputs[name] = new TList();
1238  fOutputs[name]->SetName(name.c_str());
1239  }
1240  return fOutputs[name];
1241 }
1242 
1243 NGnTree * NGnTree::Open(const std::string & filename, const std::string & branches, const std::string & treename)
1244 {
1248 
1249  NLogDebug("Opening '%s' with branches='%s' and treename='%s' ...", filename.c_str(), branches.c_str(),
1250  treename.c_str());
1251 
1252  TFile * file = TFile::Open(filename.c_str());
1253  if (!file) {
1254  NLogError("NGnTree::Open: Cannot open file '%s'", filename.c_str());
1255  return nullptr;
1256  }
1257 
1258  TTree * tree = (TTree *)file->Get(treename.c_str());
1259  if (!tree) {
1260  NLogError("NGnTree::Open: Cannot get tree '%s' from file '%s'", treename.c_str(), filename.c_str());
1261  return nullptr;
1262  }
1263 
1264  return Open(tree, branches, file);
1265 }
1266 
1267 NGnTree * NGnTree::Open(TTree * tree, const std::string & branches, TFile * file)
1268 {
1272 
1273  NBinning * hnstBinning = (NBinning *)tree->GetUserInfo()->At(0);
1274  if (!hnstBinning) {
1275  NLogError("NGnTree::Open: Cannot get binning from tree '%s'", tree->GetName());
1276  return nullptr;
1277  }
1278  NStorageTree * hnstStorageTree = (NStorageTree *)tree->GetUserInfo()->At(1);
1279  if (!hnstStorageTree) {
1280  NLogError("NGnTree::Open: Cannot get tree storage info from tree '%s'", tree->GetName());
1281  return nullptr;
1282  }
1283 
1284  std::map<std::string, TList *> outputs;
1285  TDirectory * dir = nullptr;
1286  if (file) {
1287  dir = (TDirectory *)file->Get("outputs");
1288  auto l = dir->GetListOfKeys();
1289  for (auto kv : *l) {
1290  TObject * obj = dir->Get(kv->GetName());
1291  if (!obj) continue;
1292  TList * l = dynamic_cast<TList *>(obj);
1293  if (!l) continue;
1294  outputs[l->GetName()] = l;
1295  NLogDebug("Imported output list for binning '%s' with %d object(s) from file '%s'", l->GetName(), l->GetEntries(),
1296  file->GetName());
1297  }
1298  }
1299  // TDirectory * dir = (TDirectory *)tree->GetUserInfo()->FindObject("outputs");
1300  // if (dir) {
1301  // dir->Print();
1302  // }
1303 
1304  NGnTree * ngnt = new NGnTree(hnstBinning, hnstStorageTree);
1305 
1306  if (!hnstStorageTree->SetFileTree(file, tree)) return nullptr;
1307  // if (!ngnt->InitBinnings({})) return nullptr;
1308  // ngnt->Print();
1309  // Get list of branches
1310  std::vector<std::string> enabledBranches;
1311  if (!branches.empty()) {
1312  enabledBranches = Ndmspc::NUtils::Tokenize(branches, ',');
1313  NLogTrace("NGnTree::Open: Enabled branches: %s", NUtils::GetCoordsString(enabledBranches, -1).c_str());
1314  hnstStorageTree->SetEnabledBranches(enabledBranches);
1315  }
1316  else {
1317  // loop over all branches and set address
1318  for (auto & kv : hnstStorageTree->GetBranchesMap()) {
1319  NLogTrace("NGnTree::Open: Enabled branches: %s", kv.first.c_str());
1320  }
1321  }
1322  // Set all branches to be read
1323  hnstStorageTree->SetBranchAddresses();
1324  ngnt->SetOutputs(outputs);
1325 
1326  NGnNavigator * nav = new NGnNavigator();
1327  nav->SetGnTree(ngnt);
1328  ngnt->SetNavigator(nav);
1329 
1330  return ngnt;
1331 }
1332 
1334 {
1338 
1339  if (fNavigator) {
1340  NLogTrace("NGnTree::SetNavigator: Replacing existing navigator ...");
1341  SafeDelete(fNavigator);
1342  }
1343 
1344  fNavigator = navigator;
1345 }
1346 
1347 bool NGnTree::Close(bool write)
1348 {
1352 
1353  if (!fTreeStorage) {
1354  NLogError("NGnTree::Close: Storage tree is not initialized in NGnTree !!!");
1355  return false;
1356  }
1357 
1358  return fTreeStorage->Close(write, fOutputs);
1359 }
1360 
1361 Int_t NGnTree::GetEntry(Long64_t entry, bool checkBinningDef)
1362 {
1366  if (!fTreeStorage) {
1367  NLogError("NGnTree::GetEntry: Storage tree is not initialized in NGnTree !!!");
1368  return -1;
1369  }
1370 
1371  int bytes =
1372  fTreeStorage->GetEntry(entry, fBinning->GetPoint(0, fBinning->GetCurrentDefinitionName()), checkBinningDef);
1373  if (fTreeStorage->GetBranch("_params")) fParameters = (NParameters *)fTreeStorage->GetBranch("_params")->GetObject();
1374  return bytes;
1375 }
1376 Int_t NGnTree::GetEntry(std::vector<std::vector<int>> /*range*/, bool checkBinningDef)
1377 {
1381 
1382  return GetEntry(0, checkBinningDef);
1383 }
1384 
1385 void NGnTree::Play(int timeout, std::string binning, std::vector<int> outputPointIds,
1386  std::vector<std::vector<int>> ranges, Option_t * option)
1387 {
1391  TString opt = option;
1392  opt.ToUpper();
1393 
1394  std::string annimationTempDir =
1395  TString::Format("%s/.ndmspc/animation/%d", gSystem->Getenv("HOME"), gSystem->GetPid()).Data();
1396  gSystem->Exec(TString::Format("mkdir -p %s", annimationTempDir.c_str()));
1397 
1398  if (binning.empty()) {
1399  binning = fBinning->GetCurrentDefinitionName();
1400  }
1401 
1402  NBinningDef * binningDef = fBinning->GetDefinition(binning);
1403  if (!binningDef) {
1404  NLogError("NGnTree::Play: Binning definition '%s' not found in NGnTree !!!", binning.c_str());
1405  NLogError("Available binning definitions:");
1406  for (auto & name : fBinning->GetDefinitionNames()) {
1407  if (name == fBinning->GetCurrentDefinitionName())
1408  NLogError(" [*] %s", name.c_str());
1409  else
1410  NLogError(" [ ] %s", name.c_str());
1411  }
1412  return;
1413  }
1414 
1415  THnSparse * bdContent = (THnSparse *)binningDef->GetContent()->Clone();
1416 
1417  std::string bdContentName = TString::Format("bdContent_%s", binning.c_str()).Data();
1418  // Set axis ranges if provided
1419  if (!ranges.empty()) NUtils::SetAxisRanges(bdContent, ranges, false, true);
1420 
1421  Long64_t linBin = 0;
1422  std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{bdContent->CreateIter(true /*use axis range*/)};
1423  std::vector<Long64_t> ids;
1424  // std::vector<Long64_t> ids = binningDef->GetIds();
1425  while ((linBin = iter->Next()) >= 0) {
1426  // NLogDebug("Original content bin %lld: %f", linBin, bdContentOrig->GetBinContent(linBin));
1427  ids.push_back(linBin);
1428  }
1429  if (ids.empty()) {
1430  NLogWarning("NGnTree::Play: No entries found in binning definition '%s' !!!", binning.c_str());
1431  return;
1432  }
1433  // return;
1434 
1435  TCanvas * c1 = nullptr;
1436 
1437  c1 = (TCanvas *)gROOT->GetListOfCanvases()->FindObject("c1");
1438  if (c1 == nullptr) c1 = new TCanvas("c1", "NGnTree::Play", 800, 600);
1439  c1->Clear();
1440  c1->cd();
1441  c1->DivideSquare(outputPointIds.size() > 0 ? outputPointIds.size() + 1 : 1);
1442  gSystem->ProcessEvents();
1443 
1444  binningDef->Print();
1445  bdContent->Reset();
1446 
1447  // loop over all ids and print them
1448  for (auto id : ids) {
1449  // for (int id = 0; id < GetEntries(); id++) {
1450  GetEntry(id);
1451  fBinning->GetPoint()->Print();
1452  TList * l = (TList *)fTreeStorage->GetBranch("_outputPoint")->GetObject();
1453  if (!l || l->IsEmpty()) {
1454  NLogWarning("NGnTree::Play: No 'outputPoint' for entry %lld !!!", id);
1455  continue;
1456  }
1457  else {
1458  // NLogInfo("Output for entry %lld:", id);
1459  // l->Print(opt.Data());
1460 
1461  if (outputPointIds.empty()) {
1462  outputPointIds.resize(l->GetEntries());
1463  for (int i = 0; i < l->GetEntries(); i++) {
1464  outputPointIds[i] = i;
1465  }
1466  }
1467  int n = outputPointIds.size();
1468 
1469  Double_t v = 1.0;
1470  for (int i = 0; i < n; i++) {
1471  // NLogDebug("Drawing output object id %d (list index %d) on pad %d", outputPointIds[i], i, i + 1);
1472 
1473  c1->cd(i + 2);
1474  TObject * obj = l->At(outputPointIds[i]);
1475  if (obj) {
1476  if (obj->InheritsFrom(TH1::Class())) {
1477  TH1 * h = (TH1 *)obj;
1478  h->SetDirectory(nullptr);
1479  // Draw a clone to avoid transferring ownership or modifying
1480  // the original object stored in the TList (can cause
1481  // TPad/TList removal during drawing and lead to crashes).
1482  TH1 * hclone = (TH1 *)h->Clone();
1483  if (hclone) {
1484  hclone->SetDirectory(nullptr);
1485  hclone->Draw();
1486  }
1487  }
1488  // obj->Print();
1489  }
1490  if (obj->InheritsFrom(TH1::Class()) && i == 0) {
1491  TH1 * h = (TH1 *)obj;
1492  v = h->GetMean();
1493  NLogDebug("Mean value from histogram [%s]: %f", h->GetName(), v);
1494  }
1495  }
1496  bdContent->SetBinContent(fBinning->GetPoint()->GetStorageCoords(), 1);
1497  c1->cd(1);
1498  TH1 * bdProj = (TH1 *)gROOT->FindObjectAny("bdProj");
1499  if (bdProj) {
1500  delete bdProj;
1501  bdProj = nullptr;
1502  }
1503  if (bdContent->GetNdimensions() == 1) {
1504  bdProj = bdContent->Projection(0, "O");
1505  }
1506  else if (bdContent->GetNdimensions() == 2) {
1507  bdProj = bdContent->Projection(0, 1, "O");
1508  }
1509  else if (bdContent->GetNdimensions() == 3) {
1510  bdProj = bdContent->Projection(0, 1, 2, "O");
1511  }
1512  else {
1513  NLogError("NGnTree::Play: Cannot project THnSparse with %d dimensions", bdContent->GetNdimensions());
1514  }
1515  if (bdProj) {
1516  bdProj->SetName("bdProj");
1517  bdProj->SetTitle(TString::Format("Binning '%s' content projection", binning.c_str()).Data());
1518  bdProj->SetMinimum(0);
1519  // bdProj->SetDirectory(nullptr);
1520  bdProj->Draw("colz");
1521  // c1->ModifiedUpdate();
1522  }
1523  }
1524  if (c1) {
1525  c1->ModifiedUpdate();
1526  c1->SaveAs(TString::Format("%s/ndmspc_play_%06lld.png", annimationTempDir.c_str(), bdContent->GetNbins()).Data());
1527  }
1528  gSystem->ProcessEvents();
1529  if (timeout > 0) gSystem->Sleep(timeout);
1530  NLogInfo("%d", id);
1531  }
1532 
1533  NLogInfo("Creating animation gif from %s/ndmspc_play_*.png ...", annimationTempDir.c_str());
1534  gSystem->Exec(
1535  TString::Format("magick -delay 20 -loop 0 %s/ndmspc_play_*.png ndmspc_play.gif", annimationTempDir.c_str()));
1536  gSystem->Exec(TString::Format("rm -fr %s", annimationTempDir.c_str()));
1537  NLogInfo("Animation saved to ndmspc_play.gif");
1538 
1539  delete bdContent;
1540 }
1541 
1542 TList * NGnTree::Projection(const json & cfg, std::string binningName)
1543 {
1547 
1548  // SetInput(); // Set input to selfp
1550  Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * output, TList * /*outputPoint*/,
1551  int /*threadId*/) {
1552  // NLogInfo("Thread ID: %d", threadId);
1553  TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
1554  point->Print();
1555  json cfg = point->GetCfg();
1556 
1557  Printf("Processing THnSparse projection with configuration: %s", cfg.dump().c_str());
1558 
1559  Ndmspc::NGnTree * ngntIn = point->GetInput();
1560  // ngntIn->Print();
1561  // ngntIn->GetEntry(0);
1562  ngntIn->GetEntry(point->GetEntryNumber());
1563 
1564  // loop over all cfg["objects"]
1565  for (auto & [objName, objCfg] : cfg["objects"].items()) {
1566  NLogInfo("Processing object '%s' ...", objName.c_str());
1567 
1568  THnSparse * hns = (THnSparse *)(ngntIn->GetStorageTree()->GetBranchObject(objName));
1569  if (hns == nullptr) {
1570  NLogError("NGnTree::Projection: THnSparse 'hns' not found in storage tree !!!");
1571  return;
1572  }
1573  // hns->Print("all");
1574  // loop over cfg["objects"][objName] array of projection dimension names
1575  for (size_t i = 0; i < objCfg.size(); i++) {
1576 
1577  NLogInfo("Processing projection %zu for object '%s' ...", i, objName.c_str());
1578  std::vector<int> dims;
1579  std::vector<std::string> dimNames = cfg["objects"][objName][i].get<std::vector<std::string>>();
1580  for (const auto & dimName : dimNames) {
1581  NLogDebug("Looking for dimension name '%s' in THnSparse ...", dimName.c_str());
1582  int dim = -1;
1583  for (int i = 0; i < hns->GetNdimensions(); i++) {
1584  if (dimName == hns->GetAxis(i)->GetName()) {
1585  dim = i;
1586  break;
1587  }
1588  }
1589  if (dim >= 0)
1590  dims.push_back(dim);
1591  else {
1592  NLogError("NGnTree::Projection: Dimension name '%s' not found in THnSparse !!!", dimName.c_str());
1593  }
1594  }
1595  // Print dims
1596  NLogInfo("Projecting THnSparse on dimensions: %s", NUtils::GetCoordsString(dims, -1).c_str());
1597  TH1 * hPrev = (TH1 *)output->At(i);
1598  TH1 * hProj = NUtils::ProjectTHnSparse(hns, dims, "O");
1599  hProj->SetName(TString::Format("%s_proj_%s", objName.c_str(), NUtils::Join(dims, '_').c_str()).Data());
1600  if (hPrev) {
1601  hPrev->Add(hProj);
1602  }
1603  else {
1604  output->Add(hProj);
1605  }
1606  }
1607  }
1608  output->Print();
1609  };
1610 
1611  // NBinningDef * binningDef = fInput->GetBinning()->GetDefinition(binningName);
1612  NBinningDef * binningDef = GetBinning()->GetDefinition(binningName);
1613  THnSparse * hnsIn = binningDef->GetContent();
1614  // std::vector<std::vector<int>> ranges{{0, 2, 2}, {2, 1, 1}};
1615  std::vector<std::vector<int>> ranges = cfg["ranges"].get<std::vector<std::vector<int>>>();
1616  NUtils::SetAxisRanges(hnsIn, ranges); // Set the ranges for the axes
1617  Long64_t linBin = 0;
1618  std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{hnsIn->CreateIter(true /*use axis range*/)};
1619  std::vector<Long64_t> ids;
1620  // std::vector<Long64_t> ids = binningDef->GetIds();
1621  while ((linBin = iter->Next()) >= 0) {
1622  ids.push_back(linBin);
1623  }
1624  if (ids.empty()) {
1625  NLogWarning("NGnTree::Projection: No entries found in binning definition '%s' !!!", binningDef->GetName());
1626  binningDef->RefreshIdsFromContent();
1627  return nullptr;
1628  }
1629 
1630  binningDef->GetIds() = ids;
1631 
1632  // NUtils::SetAxisRanges(, std::vector<std::vector<int>> ranges)
1633  Process(processFunc, cfg);
1634 
1635  // Refresh binning definition ids from content after processing
1636  binningDef->RefreshIdsFromContent();
1637 
1638  // GetInput()->Close(false);
1640 
1641  // Close(false);
1642 }
1643 
1644 NGnNavigator * NGnTree::Reshape(std::string binningName, std::vector<std::vector<int>> levels, int level,
1645  std::map<int, std::vector<int>> ranges, std::map<int, std::vector<int>> rangesBase)
1646 {
1650 
1651  NGnNavigator navigator;
1652  navigator.SetGnTree(this);
1653 
1654  return navigator.Reshape(binningName, levels, level, ranges, rangesBase);
1655 }
1656 
1657 NGnNavigator * NGnTree::GetResourceStatisticsNavigator(std::string binningName, std::vector<std::vector<int>> levels,
1658  int level, std::map<int, std::vector<int>> ranges,
1659  std::map<int, std::vector<int>> rangesBase)
1660 {
1664 
1665  if (binningName.empty()) {
1666  binningName = fBinning->GetCurrentDefinitionName();
1667  }
1668 
1669  THnSparse * hns = (THnSparse *)fOutputs[binningName]->FindObject("resource_monitor");
1670  if (!hns) {
1671  NLogError("NGnTree::Draw: Resource monitor THnSparse not found in outputs !!!");
1672  return nullptr;
1673  }
1674  hns->Print("all");
1675  // return nullptr;
1676 
1677  auto ngnt = new NGnTree(hns, "stat", "/tmp/hnst_imported_for_drawing.root");
1678  if (ngnt->IsZombie()) {
1679  NLogError("NGnTree::GetResourceStatisticsNavigator: Failed to import resource monitor THnSparse !!!");
1680  return nullptr;
1681  }
1682  ngnt->Print();
1683  ngnt->Close();
1684 
1685  // return nullptr;
1686  auto ngnt2 = NGnTree::Open("/tmp/hnst_imported_for_drawing.root");
1687  auto nav = ngnt2->Reshape("default", levels, level, ranges, rangesBase);
1688  // // nav->Export("/tmp/hnst_imported_for_drawing.json", {}, "ws://localhost:8080/ws/root.websocket");
1689  // // nav->Draw();
1690  return nav;
1691 }
1692 
1693 bool NGnTree::InitParameters(const std::vector<std::string> & paramNames)
1694 {
1698 
1699  if (fParameters) {
1700  NLogTrace("NGnTree::InitParameters: Replacing existing parameters ...");
1701  delete fParameters;
1702  }
1703 
1704  if (paramNames.empty()) {
1705  NLogTrace("NGnTree::InitParameters: No parameter names provided, skipping ...");
1706  return false;
1707  }
1708 
1709  fParameters = new NParameters(paramNames, "results", "Results");
1710 
1711  return true;
1712 }
1713 
1714 NGnTree * NGnTree::Import(const std::string & findPath, const std::string & fileName,
1715  const std::vector<std::string> & headers, const std::string & outputFile, bool close)
1716 {
1720 
1721  // remove trailing slash from findPath if exists
1722  std::string findPathClean = findPath;
1723  if (!findPathClean.empty() && findPathClean.back() == '/') {
1724  findPathClean.pop_back();
1725  }
1726 
1727  std::vector<std::string> paths = NUtils::Find(findPathClean, fileName);
1728  NLogInfo("NGnTree::Import: Found %zu files to import ...", paths.size());
1729 
1730  TObjArray * ngntArray = NUtils::AxesFromDirectory(paths, findPathClean, fileName, headers);
1731  int nDirAxes = ngntArray->GetEntries();
1732 
1733  NGnTree * ngntFirst = NGnTree::Open(paths[0]);
1734  // Add all axes from ngntFirst to ngntArray
1735  for (const auto & axis : ngntFirst->GetBinning()->GetAxes()) {
1736  ngntArray->Add(axis->Clone());
1737  }
1738  ngntFirst->Close(false);
1739 
1740  std::map<std::string, std::vector<std::vector<int>>> b;
1741 
1742  for (int i = 0; i < ngntArray->GetEntries(); i++) {
1743  TAxis * axis = (TAxis *)ngntArray->At(i);
1744  b[axis->GetName()].push_back({1});
1745  }
1746 
1747  NGnTree * ngnt = new NGnTree(ngntArray, outputFile);
1748  ngnt->SetIsPureCopy(true);
1749 
1750  // return nullptr;
1751  ngnt->GetBinning()->AddBinningDefinition("default", b);
1752 
1753  json cfg;
1754  cfg["basedir"] = findPathClean;
1755  cfg["filename"] = fileName;
1756  cfg["nDirAxes"] = nDirAxes;
1757  cfg["headers"] = headers;
1758  // cfg["ndmspc"]["shared"]["currentFileName"] = "";
1759  Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * /*output*/, TList * outputPoint,
1760  int /*threadId*/) {
1761  // point->Print();
1762 
1763  json cfg = point->GetCfg();
1764  std::string filename = cfg["basedir"].get<std::string>();
1765  filename += "/";
1766  for (auto & header : cfg["headers"]) {
1767  filename += point->GetBinLabel(header.get<std::string>());
1768  filename += "/";
1769  }
1770  // filename += "/";
1771  // filename += point->GetBinLabel("c");
1772  // filename += point->GetBinLabel("year");
1773  // filename += "/";
1774  filename += cfg["filename"].get<std::string>();
1775  NGnTree * ngnt = (NGnTree *)point->GetTempObject("file");
1776  if (!ngnt || filename.compare(ngnt->GetStorageTree()->GetFileName()) != 0) {
1777  NLogInfo("NGnTree::Import: Opening file '%s' ...", filename.c_str());
1778  if (ngnt) {
1779  NLogDebug("NGnTree::Import: Closing previously opened file '%s' ...",
1780  ngnt->GetStorageTree()->GetFileName().c_str());
1781  ngnt->Close(false);
1782  // delete ngnt;
1783  point->SetTempObject("file", nullptr);
1784  }
1785  ngnt = NGnTree::Open(filename.c_str());
1786  if (!ngnt || ngnt->IsZombie()) {
1787  NLogError("NGnTree::Import: Cannot open file '%s'", filename.c_str());
1788  return;
1789  }
1790  point->SetTempObject("file", ngnt);
1791  }
1792 
1793  int nDirAxes = cfg["nDirAxes"].get<int>();
1794  Int_t * coords = point->GetCoords();
1795  std::string coordsStr = NUtils::GetCoordsString(NUtils::ArrayToVector(coords, point->GetNDimensionsContent()));
1796  NLogInfo("NGnTree::Import: Processing point with coords %s ...", coordsStr.c_str());
1797 
1798  Long64_t entryNumber =
1799  ngnt->GetBinning()->GetContent()->GetBin(&coords[3 * nDirAxes], kFALSE); // skip first 3 dir axes
1800  NLogInfo("NGnTree::Import: Corresponding entry number in file '%s' is %lld", filename.c_str(), entryNumber);
1801 
1802  ngnt->GetEntry(entryNumber);
1803 
1804  // // add outputPoint content to outputPoint list
1805  // TList * inputOutputPoint = (TList *)ngnt->GetStorageTree()->GetBranch("_outputPoint")->GetObject();
1806  // for (int i = 0; i < inputOutputPoint->GetEntries(); i++) {
1807  // outputPoint->Add(inputOutputPoint->At(i));
1808  // }
1809 
1810  // set all branches from ngnt to branch addresses in current object
1811  for (const auto & kv : ngnt->GetStorageTree()->GetBranchesMap()) {
1812  // check if branch exists in current storage tree
1813  if (point->GetStorageTree()->GetBranch(kv.first) == nullptr) {
1814  NLogTrace("NGnTree::Import: Adding branch '%s' to storage tree ...", kv.first.c_str());
1815  point->GetStorageTree()->AddBranch(kv.first, nullptr, kv.second.GetObjectClassName());
1816  }
1817  NLogTrace("NGnTree::Import: Setting branch address for branch '%s' ...", kv.first.c_str());
1818  point->GetTreeStorage()->GetBranch(kv.first)->SetAddress(kv.second.GetObject());
1819  }
1820  outputPoint->Add(new TNamed("source_file", filename));
1821 
1822  // ngnt->Print();
1823 
1824  // NLogInfo("NGnTree::Import: nDirAxes=%d ...", cfg["nDirAxes"].get<int>());
1825 
1826  // json & tempCfg = point->GetTempCfg();
1827  // if (tempCfg["test"].is_null()) {
1828  // NLogInfo("Setting temp cfg test value to 42");
1829  // tempCfg["test"] = 42;
1830  // }
1831  // NLogInfo("Temp cfg test value: %d", tempCfg["test"].get<int>());
1832 
1833  // f->ls();
1834  };
1835  Ndmspc::NGnBeginFuncPtr beginFunc = [](Ndmspc::NBinningPoint * /*point*/, int /*threadId*/) {
1836  TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
1837  };
1838 
1839  Ndmspc::NGnEndFuncPtr endFunc = [](Ndmspc::NBinningPoint * point, int /*threadId*/) {
1840  NGnTree * ngnt = (NGnTree *)point->GetTempObject("file");
1841  if (ngnt) {
1842  NLogDebug("NGnTree::Import: Closing last file '%s' ...", ngnt->GetStorageTree()->GetFileName().c_str());
1843  // ngnt->Close(false);
1844  // delete ngnt;
1845  point->SetTempObject("file", nullptr);
1846  }
1847  };
1848 
1849  ngnt->Process(processFunc, cfg, "", beginFunc, endFunc);
1850  if (close) {
1851  ngnt->Close(true);
1852  delete ngnt;
1853  ngnt = NGnTree::Open(outputFile.c_str());
1854  }
1855  return ngnt;
1856 }
1857 
1858 } // namespace Ndmspc
Defines binning mapping and content for NDMSPC histograms.
Definition: NBinningDef.h:26
THnSparse * GetContent() const
Get the template content histogram.
Definition: NBinningDef.h:118
virtual void Print(Option_t *option="") const
Print binning definition information.
void RefreshIdsFromContent()
Refresh bin IDs from content histogram.
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Definition: NBinningDef.h:93
Represents a single point in multi-dimensional binning.
Definition: NBinningPoint.h:21
Double_t GetBinMax(std::string axis) const
Get the maximum value for a specific axis.
std::string GetString(const std::string &prefix="", bool all=false) const
Returns a string representation of the binning point.
void SetTreeStorage(NStorageTree *s)
Set storage tree object pointer.
std::string GetBinLabel(std::string axis) const
Get the label for a specific axis.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
Long64_t GetEntryNumber() const
Get entry number for the point.
NParameters * GetParameters() const
Get the parameters associated with this binning point.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
Definition: NBinningPoint.h:67
Double_t GetBinCenter(std::string axis) const
Returns the center value of the bin along the specified axis.
NStorageTree * GetTreeStorage() const
Get pointer to storage tree object.
void SetTempObject(const std::string &name, TObject *obj)
Set a temporary object with the given name.
NStorageTree * GetStorageTree() const
Returns a pointer to the associated storage tree.
Definition: NBinningPoint.h:90
virtual void Print(Option_t *option="") const
Print binning point information.
TObject * GetTempObject(const std::string &name) const
Retrieve a temporary object by name.
NGnTree * GetInput() const
Get pointer to input NGnTree object.
Int_t GetBin(std::string axis) const
Returns the bin index for the specified axis.
Int_t * GetCoords() const
Get pointer to content coordinates array.
Definition: NBinningPoint.h:55
json & GetCfg()
Get reference to configuration JSON object.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
Definition: NBinningPoint.h:49
Double_t GetBinMin(std::string axis) const
Get the minimum value for a specific axis.
NBinning object for managing multi-dimensional binning and axis definitions.
Definition: NBinning.h:45
NBinningDef * GetDefinition(const std::string &name="")
Get binning definition by name.
Definition: NBinning.cxx:1024
std::vector< std::string > GetDefinitionNames() const
Get all definition names.
Definition: NBinning.h:270
std::string GetCurrentDefinitionName() const
Get current definition name.
Definition: NBinning.h:276
NBinningPoint * GetPoint()
Get the current binning point.
Definition: NBinning.cxx:1128
virtual void Print(Option_t *option="") const
Print binning information.
Definition: NBinning.cxx:245
std::vector< TAxis * > GetAxes() const
Get vector of axis pointers.
Definition: NBinning.h:223
bool SetCfg(const json &cfg)
Set configuration from JSON.
Definition: NBinning.cxx:1174
void AddBinningDefinition(std::string name, std::map< std::string, std::vector< std::vector< int >>> binning, bool forceDefault=false)
Add a binning definition.
Definition: NBinning.cxx:1053
void Reset()
Reset the binning object to initial state.
Definition: NBinning.cxx:78
Executes a function over all points in an N-dimensional space, optionally in parallel.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
Navigator object for managing hierarchical data structures and projections.
Definition: NGnNavigator.h:22
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int >> levels, size_t level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Reshape navigator using binning name and levels.
void SetGnTree(NGnTree *tree)
Set NGnTree object pointer.
Definition: NGnNavigator.h:184
Thread-local data object for NDMSPC processing.
Definition: NGnThreadData.h:20
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
void SetResultsFilename(const std::string &filename)
Set the results filename for TCP mode (shared filesystem path).
Definition: NGnThreadData.h:99
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
Definition: NGnThreadData.h:66
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
void FlushDeferredDeletes()
Delete deferred ROOT objects, skipping TCanvas/TPad (leaked safely).
void SetCfg(const json &cfg)
Set configuration JSON object.
Definition: NGnThreadData.h:84
NDMSPC tree object for managing multi-dimensional data storage and processing.
Definition: NGnTree.h:75
NBinning * GetBinning() const
Get pointer to binning object.
Definition: NGnTree.h:161
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
Definition: NGnTree.h:179
virtual void Draw(Option_t *option="") override
Draws the tree object.
Definition: NGnTree.cxx:492
void SetIsPureCopy(bool val)
Sets the pure copy status of the tree.
Definition: NGnTree.h:230
bool Close(bool write=false)
Close the tree, optionally writing data.
Definition: NGnTree.cxx:1347
bool fOwnsTreeStorage
True when fTreeStorage is owned by this instance.
Definition: NGnTree.h:393
virtual ~NGnTree()
Destructor.
Definition: NGnTree.cxx:453
Int_t GetEntry(Long64_t entry, bool checkBinningDef=true)
Get entry by index.
Definition: NGnTree.cxx:1361
virtual void Print(Option_t *option="") const override
Print tree information.
Definition: NGnTree.cxx:468
void SetInput(NGnTree *input)
Set input NGnTree pointer.
Definition: NGnTree.h:204
NGnTree()
Default constructor.
Definition: NGnTree.cxx:157
static NGnTree * Import(const std::string &findPath, const std::string &fileName, const std::vector< std::string > &headers, const std::string &outFileName="/tmp/ngnt_imported.root", bool close=true)
Imports an NGnTree from a specified file.
Definition: NGnTree.cxx:1714
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int >> levels, int level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Reshape navigator using binning name and levels.
Definition: NGnTree.cxx:1644
NBinning * fBinning
Binning object.
Definition: NGnTree.h:386
TList * GetOutput(std::string name="")
Get output list by name.
Definition: NGnTree.cxx:1227
NParameters * GetParameters() const
Returns the parameters associated with this tree.
Definition: NGnTree.h:329
NGnTree * fInput
Input NGnTree for processing.
Definition: NGnTree.h:389
bool fOwnsBinning
True when fBinning is owned by this instance.
Definition: NGnTree.h:392
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
Definition: NGnTree.h:173
NGnTree * GetInput() const
Get pointer to input NGnTree.
Definition: NGnTree.h:198
std::string fWorkerMacroList
Comma-separated macro paths sent to TCP workers.
Definition: NGnTree.h:395
void Play(int timeout=0, std::string binning="", std::vector< int > outputPointIds={0}, std::vector< std::vector< int >> ranges={}, Option_t *option="")
Play tree data with optional binning and output point IDs.
Definition: NGnTree.cxx:1385
void SetOutputs(std::map< std::string, TList * > outputs)
Set outputs map.
Definition: NGnTree.h:192
bool InitParameters(const std::vector< std::string > &paramNames)
Initializes the parameters for the tree using the provided parameter names.
Definition: NGnTree.cxx:1693
std::map< std::string, TList * > fOutputs
Outputs.
Definition: NGnTree.h:388
NGnNavigator * fNavigator
! Navigator object
Definition: NGnTree.h:390
NParameters * fParameters
Parameters object.
Definition: NGnTree.h:391
TList * Projection(const json &cfg, std::string binningName="")
Project tree data using configuration and binning name.
Definition: NGnTree.cxx:1542
NGnNavigator * GetResourceStatisticsNavigator(std::string binningName, std::vector< std::vector< int >> levels, int level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Returns a navigator for resource statistics based on binning and levels.
Definition: NGnTree.cxx:1657
NStorageTree * fTreeStorage
Tree storage.
Definition: NGnTree.h:387
static std::string BuildObjectPath(const json &cfg, const json &objCfg, const NBinningPoint *point)
Helper: build object path string from configuration and a binning point.
Definition: NGnTree.cxx:43
void SetNavigator(NGnNavigator *navigator)
Sets the navigator for this tree.
Definition: NGnTree.cxx:1333
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
Definition: NGnTree.cxx:1243
bool Process(NGnProcessFuncPtr func, const json &cfg=json::object(), std::string binningName="", NGnBeginFuncPtr beginFunc=nullptr, NGnEndFuncPtr endFunc=nullptr)
Process tree data using a function pointer and configuration.
Definition: NGnTree.cxx:501
static bool GetConsoleOutput()
Get console output flag.
Definition: NLogger.h:548
NParameters object.
Definition: NParameters.h:13
bool SetParameter(int bin, Double_t value, Double_t error=0.)
Set the value and error of a parameter by bin index.
Definition: NParameters.cxx:55
NDMSPC storage tree object for managing ROOT TTree-based data storage.
Definition: NStorageTree.h:22
void SetBranchAddresses()
Set addresses for all branches.
void SetEnabledBranches(std::vector< std::string > branches, int status=1)
Set enabled/disabled status for branches.
bool SetFileTree(TFile *file, TTree *tree)
Tree handling.
std::string GetFileName() const
Get file name.
Definition: NStorageTree.h:194
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
Long64_t GetEntry(Long64_t entry, NBinningPoint *point=nullptr, bool checkBinningDef=false)
Get entry by index and fill NBinningPoint.
std::string GetPrefix() const
Get prefix path.
Definition: NStorageTree.h:200
TObject * GetBranchObject(const std::string &name)
Get pointer to branch object by name.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
bool InitTree(const std::string &filename="", const std::string &treename="ngnt")
Initialize tree from file and tree name.
std::string GetPostfix() const
Get postfix path.
Definition: NStorageTree.h:206
bool Close(bool write=false, std::map< std::string, TList * > outputs={})
Close the storage tree, optionally writing outputs.
TTree * GetTree() const
Get pointer to TTree object.
Definition: NStorageTree.h:188
virtual void Print(Option_t *option="") const
Print storage tree information.
void SetBinning(NBinning *binning)
Set binning object pointer.
Definition: NStorageTree.h:182
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
Definition: NStorageTree.h:129
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
Definition: NTreeBranch.cxx:59
TObject * GetObject() const
Get object pointer.
Definition: NTreeBranch.h:81
static bool SetAxisRanges(THnSparse *sparse, std::vector< std::vector< int >> ranges={}, bool withOverflow=false, bool modifyTitle=false, bool reset=true)
Set axis ranges for THnSparse using vector of ranges.
Definition: NUtils.cxx:1224
static TH1 * ProjectTHnSparse(THnSparse *hns, const std::vector< int > &axes, Option_t *option="")
Project a THnSparse histogram onto specified axes.
Definition: NUtils.cxx:1171
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition: NUtils.cxx:1077
static std::string FormatTime(long long seconds)
Format time in seconds to human-readable string.
Definition: NUtils.cxx:1664
static bool EnableMT(Int_t numthreads=-1)
Enable multi-threading with specified number of threads.
Definition: NUtils.cxx:46
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
Definition: NUtils.cxx:1115
static void ProgressBar(int current, int total, std::string prefix="", std::string suffix="", int barWidth=50)
Display progress bar.
Definition: NUtils.cxx:1677
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
Definition: NUtils.cxx:1592
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.
Definition: NUtils.cxx:1555
static TObjArray * AxesFromDirectory(const std::vector< std::string > paths, const std::string &findPath, const std::string &fileName, const std::vector< std::string > &axesNames)
Creates an array of axes objects from files in specified directories.
Definition: NUtils.cxx:1400
static std::vector< std::string > Find(std::string path, std::string filename="")
Find files in a path matching filename.
Definition: NUtils.cxx:967
Execution progress metrics for IPC-based distributed processing.
size_t activeWorkers
Number of workers currently active.
size_t tasksAcked
Tasks completed and ACKed by workers.