9 #include <TDirectory.h>
13 #include <THnSparse.h>
18 #include <TObjString.h>
20 #include <TBufferJSON.h>
23 #include "NParameters.h"
24 #include "NStorageTree.h"
26 #include "NBinningDef.h"
27 #include "NDimensionalExecutor.h"
28 #include "NDimensionalIpcRunner.h"
29 #include "NGnThreadData.h"
31 #include "NTreeBranch.h"
33 #include "NStorageTree.h"
34 #include "NGnNavigator.h"
45 std::string objPath =
"";
46 if (objCfg.contains(
"prefix") && objCfg[
"prefix"].is_string()) {
47 objPath = objCfg[
"prefix"].get<std::string>();
50 std::string axisObjectDefaultFormat =
51 cfg[
"axisObjectDefaultFormat"].is_string() ? cfg[
"axisObjectDefaultFormat"].get<std::string>() :
"%.2f_%.2f";
52 std::string axisDefaultSeparator =
53 cfg[
"axisDefaultSeparator"].is_string() ? cfg[
"axisDefaultSeparator"].get<std::string>() :
"/";
56 for (
auto & axisEntry : cfg[
"axes"]) {
61 if (axisEntry.is_string()) {
62 axisName = axisEntry.get<std::string>();
63 if (axisObjectDefaultFormat.empty()) {
68 format = axisObjectDefaultFormat;
71 else if (axisEntry.is_object()) {
72 if (axisEntry.contains(
"name") && axisEntry[
"name"].is_string()) {
73 axisName = axisEntry[
"name"].get<std::string>();
78 if (axisEntry.contains(
"mode") && axisEntry[
"mode"].is_string()) {
79 mode = axisEntry[
"mode"].get<std::string>();
81 if (axisEntry.contains(
"format") && axisEntry[
"format"].is_string()) {
82 format = axisEntry[
"format"].get<std::string>();
90 if (axisObjectDefaultFormat.empty())
97 format = axisObjectDefaultFormat.empty() ?
"%.2f_%.2f" : axisObjectDefaultFormat;
98 else if (mode ==
"bin")
104 if (mode ==
"minmax") {
107 objPath += TString::Format(format.c_str(), min, max).Data();
109 else if (mode ==
"min") {
111 objPath += TString::Format(format.c_str(), min).Data();
113 else if (mode ==
"max") {
115 objPath += TString::Format(format.c_str(), max).Data();
117 else if (mode ==
"center") {
119 objPath += TString::Format(format.c_str(), c).Data();
121 else if (mode ==
"label") {
125 else if (mode ==
"bin") {
126 objPath += std::to_string(point->
GetBin(axisName));
129 objPath += std::to_string(point->
GetBin(axisName));
132 std::string sep = axisDefaultSeparator;
133 if (axisEntry.is_object() && axisEntry.contains(
"sufix") && axisEntry[
"sufix"].is_string()) {
134 sep = axisEntry[
"sufix"].get<std::string>();
140 if (!lastSep.empty() && objPath.size() >= lastSep.size()) {
141 objPath = objPath.substr(0, objPath.size() - lastSep.size());
143 if (objCfg.contains(
"sufix") && objCfg[
"sufix"].is_string()) {
144 objPath += objCfg[
"sufix"].get<std::string>();
159 NGnTree::NGnTree(std::vector<TAxis *> axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
165 NLogError(
"NGnTree::NGnTree: No axes provided, binning is nullptr.");
176 NGnTree::NGnTree(TObjArray * axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
182 if (axes ==
nullptr) {
183 NLogError(
"NGnTree::NGnTree: Axes TObjArray is nullptr.");
188 if (axes ==
nullptr && axes->GetEntries() == 0) {
189 NLogError(
"NGnTree::NGnTree: No axes provided, binning is nullptr.");
205 if (ngnt ==
nullptr) {
206 NLogError(
"NGnTree::NGnTree: NGnTree is nullptr.");
212 NLogError(
"NGnTree::NGnTree: Binning in NGnTree is nullptr.");
226 : TObject(), fBinning(b), fTreeStorage(s), fInput(nullptr), fOwnsBinning(false), fOwnsTreeStorage(false)
232 NLogError(
"NGnTree::NGnTree: Binning is nullptr.");
242 NLogError(
"NGnTree::NGnTree: Storage tree is nullptr.");
254 NGnTree::NGnTree(THnSparse * hns, std::string parameterAxis,
const std::string & outFileName, json cfg)
255 : TObject(), fInput(nullptr)
261 std::map<std::string, std::vector<std::vector<int>>> b;
262 TObjArray * axes =
new TObjArray();
263 int parameterAxisIdx = -1;
264 std::vector<std::string> labels;
265 for (
int i = 0; i < hns->GetNdimensions(); i++) {
266 TAxis * axisIn = (TAxis *)hns->GetAxis(i);
267 TAxis * axis = (TAxis *)axisIn->Clone();
270 if (parameterAxis.compare(axis->GetName()) == 0) {
271 parameterAxisIdx = i;
272 TAxis * axis = hns->GetAxis(parameterAxisIdx);
273 for (
int bin = 1; bin <= axis->GetNbins(); bin++) {
275 labels.push_back(axis->GetBinLabel(bin));
281 if (axisIn->IsAlphanumeric()) {
282 NLogTrace(
"Setting axis '%s' labels from input THnSparse", axis->GetName());
283 for (
int bin = 1; bin <= axisIn->GetNbins(); bin++) {
284 std::string label = axisIn->GetBinLabel(bin);
285 if (!labels.empty()) axis->SetBinLabel(bin, axisIn->GetBinLabel(bin));
290 b[axis->GetName()] = {{1}};
294 cfg[
"_parameterAxis"] = parameterAxisIdx;
295 cfg[
"_labels"] = labels;
298 NLogDebug(
"Importing THnSparse as NGnTree with parameter axis '%s' (index %d) ...", parameterAxis.c_str(),
301 NLogDebug(
"Created NGnTree for THnSparse import ...");
302 if (ngnt->IsZombie()) {
303 NLogError(
"NGnTree::Import: Failed to create NGnTree !!!");
309 const char * tmpDirStr = gSystem->Getenv(
"NDMSPC_TMP_DIR");
313 if (!tmpDirStr || tmpDir.empty()) {
316 std::string tmpFilename = tmpDir +
"/ngnt_imported_input" + std::to_string(gSystem->GetPid()) +
".root";
318 if (ngntIn->IsZombie()) {
319 NLogError(
"NGnTree::Import: Failed to create NGnTree for input !!!");
325 ngntIn->
GetOutput(
"default")->Add(hns->Clone(
"test"));
336 ngnt->
InitParameters(cfg[
"_labels"].get<std::vector<std::string>>());
338 Ndmspc::NGnProcessFuncPtr processFunc = [](
Ndmspc::NBinningPoint * point, TList * , TList * outputPoint,
341 TH1::AddDirectory(kFALSE);
343 json cfg = point->
GetCfg();
347 NLogError(
"NGnTree::Import: Input NGnTree is nullptr !!!");
352 THnSparse * hns = (THnSparse *)ngntIn->
GetOutput(
"default")->At(0);
353 if (hns ==
nullptr) {
354 NLogError(
"NGnTree::Import: THnSparse 'hns' not found in storage tree !!!");
358 int axisIdx = cfg[
"_parameterAxis"].get<
int>();
359 std::vector<std::vector<int>> ranges;
362 for (
int i = 0; i < hns->GetNdimensions(); i++) {
363 if (i == axisIdx)
continue;
365 ranges.push_back({i, coord, coord});
370 TH1 * h = hns->Projection(axisIdx,
"O");
372 NLogError(
"NGnTree::Import: Projection of THnSparse failed !!!");
375 if (h->GetEntries() > 0) {
378 for (
int bin = 1; bin <= h->GetNbinsX(); bin++) {
379 params->
SetParameter(bin, h->GetBinContent(bin), h->GetBinError(bin));
385 std::string filename = cfg[
"filename"].get<std::string>();
387 if (!f || filename.compare(f->GetName()) != 0) {
389 NLogDebug(
"NGnTree::Import: Closing previously opened file '%s' ...", f->GetName());
392 NLogDebug(
"NGnTree::Import: Opening file '%s' ...", filename.c_str());
393 f = TFile::Open(filename.c_str());
394 if (!f || f->IsZombie()) {
395 NLogError(
"NGnTree::Import: Cannot open file '%s' !!!", filename.c_str());
401 std::string axisObjectDefaultFormat =
402 cfg[
"axisObjectDefaultFormat"].is_string() ? cfg[
"axisObjectDefaultFormat"].get<std::string>() :
"%.2f_%.2f";
403 std::string axisDefaultSeparator =
404 cfg[
"axisDefaultSeparator"].is_string() ? cfg[
"axisDefaultSeparator"].get<std::string>() :
"/";
406 if (cfg.contains(
"dryrun") && cfg[
"dryrun"].is_boolean()) {
407 dryrun = cfg[
"dryrun"].get<
bool>();
411 NLogInfo(
"NGnTree::Import (dryrun): '%s' ...", point->
GetString().c_str());
415 for (
auto & [objName, objCfg] : cfg[
"objects"].items()) {
419 NLogInfo(
"NGnTree::Import (dryrun): would retrieve object '%s'", objPath.c_str());
424 NLogInfo(
"NGnTree::Import: Retrieving object '%s' from file '%s' ...", objPath.c_str(),
425 cfg[
"filename"].get<std::string>().c_str());
427 TObject * obj = f->Get(objPath.c_str());
430 NLogWarning(
"NGnTree::Import: Cannot get object '%s' from file '%s' !!!", objPath.c_str(),
431 cfg[
"filename"].get<std::string>().c_str());
436 if (obj->InheritsFrom(TCanvas::Class())) {
437 TCanvas * cObj = (TCanvas *)obj;
438 cObj->SetName(objName.c_str());
440 outputPoint->Add(obj->Clone(objName.c_str()));
447 ngnt->
Process(processFunc, cfg);
450 gSystem->Exec(TString::Format(
"rm -f %s", tmpFilename.c_str()));
474 TString opt = option;
477 NLogInfo(
"NGnTree::Print: Printing NGnTree object [ALL] ...");
482 NLogError(
"Binning is not initialized in NGnTree !!!");
488 NLogError(
"Storage tree is not initialized in NGnTree !!!");
498 NLogInfo(
"NGnTree::Draw: Drawing NGnTree object [not implemented yet]...");
501 bool NGnTree::Process(NGnProcessFuncPtr func,
const json & cfg, std::string binningName, NGnBeginFuncPtr beginFunc,
502 NGnEndFuncPtr endFunc)
509 NLogError(
"Binning is not initialized in NGnTree !!!");
516 if (!binningName.empty()) {
518 if (std::find(defNames.begin(), defNames.end(), binningName) == defNames.end()) {
519 NLogError(
"Binning definition '%s' not found in NGnTree !!!", binningName.c_str());
523 defNames.push_back(binningName);
528 bool rc =
Process(func, defNames, cfg, binningIn, beginFunc, endFunc);
530 NLogError(
"NGnTree::Process: Processing failed !!!");
537 bool NGnTree::Process(NGnProcessFuncPtr func,
const std::vector<std::string> & defNames,
const json & cfg,
538 NBinning * binningIn, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc)
544 NLogInfo(
"NGnTree::Process: Starting processing with %zu definitions ...", defNames.size());
545 bool batch = gROOT->IsBatch();
546 gROOT->SetBatch(kTRUE);
547 TH1::AddDirectory(kFALSE);
552 if (!storageFileName.empty()) {
553 storagePostfix = gSystem->BaseName(storageFileName.c_str());
556 if (storagePostfix.empty()) {
557 storagePostfix =
"ndmspc.root";
561 if (
const char * workerEndpoint = gSystem->Getenv(
"NDMSPC_WORKER_ENDPOINT")) {
562 size_t workerIndex = 0;
563 if (
const char * envIdx = gSystem->Getenv(
"NDMSPC_WORKER_INDEX")) {
564 try { workerIndex =
static_cast<size_t>(std::stoul(envIdx)); }
catch (...) {}
566 NLogInfo(
"NGnTree::Process: Worker mode — connecting to %s as worker %zu", workerEndpoint, workerIndex);
573 void * ctx = zmq_ctx_new();
574 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
575 const std::string identity = Ndmspc::NDimensionalIpcRunner::BuildWorkerIdentity(workerIndex);
576 zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size());
577 int timeoutMs = 1000;
578 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
579 zmq_connect(dealer, workerEndpoint);
580 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"READY"});
583 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(30);
586 std::vector<std::string> frames;
587 if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
588 if (errno == EAGAIN || errno == EWOULDBLOCK) {
589 if (std::chrono::steady_clock::now() > initDeadline)
break;
595 if (frames.size() >= 1 && frames[0] ==
"STOP") {
596 NLogPrint(
"NGnTree::Process: Worker received STOP before INIT — session already finished, exiting.");
599 gROOT->SetBatch(batch);
602 if (frames.size() >= 5 && frames[0] ==
"INIT") {
603 workerIndex =
static_cast<size_t>(std::stoul(frames[1]));
604 const std::string & sessionId = frames[2];
605 const std::string & initResultsDir = frames[3];
606 const std::string & initTreeName = frames[4];
609 if (frames.size() >= 7) {
610 if (!frames[5].empty())
611 gSystem->Setenv(
"NDMSPC_TMP_DIR", frames[5].c_str());
612 if (!frames[6].empty())
613 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", frames[6].c_str());
616 if (!gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR")[0] ==
'\0') {
617 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
618 if (tmpDirEnv && tmpDirEnv[0] !=
'\0')
619 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
623 const char * localTmpEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
624 const std::string localBase = localTmpEnv ? localTmpEnv :
"/tmp";
625 const std::string localFile = localBase +
"/.ndmspc/tmp/" + sessionId +
"/" +
626 std::to_string(workerIndex) +
"/" + storagePostfix;
629 const std::string resultsFile = initResultsDir +
"/" + std::to_string(workerIndex) +
"/" +
632 bool rc = workerData.
Init(workerIndex, func, beginFunc, endFunc,
this, binningIn,
fInput, localFile, initTreeName);
634 NLogError(
"NGnTree::Process: Worker failed to initialize NGnThreadData");
640 if (resultsFile != localFile) {
644 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"ACK"});
649 NLogError(
"NGnTree::Process: Worker did not receive INIT from supervisor");
655 Ndmspc::NDimensionalIpcRunner::TaskLoop(dealer, workerIndex, &workerData);
658 gROOT->SetBatch(batch);
665 int nThreads = ROOT::GetThreadPoolSize();
666 if (nThreads < 1) nThreads = 1;
668 std::string executionMode =
"thread";
669 const char * envMode = gSystem->Getenv(
"NDMSPC_EXECUTION_MODE");
670 const bool modeExplicit = (envMode && envMode[0] !=
'\0');
672 executionMode = envMode;
675 std::string normalizedMode = executionMode;
676 std::transform(normalizedMode.begin(), normalizedMode.end(), normalizedMode.begin(),
677 [](
unsigned char c) { return static_cast<char>(std::tolower(c)); });
678 if (normalizedMode ==
"process") normalizedMode =
"ipc";
680 bool useProcessIpc = (normalizedMode ==
"ipc" || normalizedMode ==
"tcp");
681 bool useTcp = (normalizedMode ==
"tcp");
682 size_t nProcesses =
static_cast<size_t>(nThreads);
683 bool ndmspcNProcExplicit =
false;
685 if (
const char * envNdmspcNProc = gSystem->Getenv(
"NDMSPC_MAX_PROCESSES")) {
686 ndmspcNProcExplicit =
true;
688 nProcesses = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envNdmspcNProc)));
691 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_MAX_PROCESSES='%s', using default=%zu", envNdmspcNProc,
695 else if (
const char * envNProc = gSystem->Getenv(
"ROOT_MAX_THREADS")) {
698 nProcesses = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envNProc)));
701 NLogWarning(
"NGnTree::Process: Invalid ROOT_MAX_THREADS='%s', using default=%zu", envNProc, nProcesses);
708 if (normalizedMode ==
"thread") {
709 useProcessIpc =
false;
712 else if (normalizedMode ==
"tcp") {
713 useProcessIpc =
true;
716 else if (normalizedMode ==
"ipc") {
717 useProcessIpc =
true;
721 NLogWarning(
"NGnTree::Process: Unknown NDMSPC_EXECUTION_MODE='%s', falling back to auto mode selection.",
722 executionMode.c_str());
723 useProcessIpc = (nProcesses > 1);
727 else if (nProcesses > 1) {
728 useProcessIpc =
true;
730 executionMode =
"ipc";
731 normalizedMode =
"ipc";
734 if (ndmspcNProcExplicit && normalizedMode ==
"thread" && nProcesses > 1) {
735 NLogWarning(
"NGnTree::Process: NDMSPC_MAX_PROCESSES=%zu is set, but NDMSPC_EXECUTION_MODE=thread disables IPC.",
739 const size_t workerObjectCount = useProcessIpc ? std::max(
static_cast<size_t>(nThreads), nProcesses)
740 :
static_cast<size_t>(nThreads);
741 std::vector<Ndmspc::NGnThreadData> threadDataVector(workerObjectCount);
743 NLogInfo(
"NGnTree::Process: executionMode='%s', useProcessIpc=%d, ROOT threads=%d, ipcProcesses=%zu, workerObjects=%zu",
744 executionMode.c_str(), useProcessIpc ? 1 : 0, nThreads, nProcesses, workerObjectCount);
746 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
748 if (tmpDirEnv && tmpDirEnv[0] !=
'\0') {
753 if (!(tmpDirPrefix.BeginsWith(
"root://") || tmpDirPrefix.BeginsWith(
"http://") ||
754 tmpDirPrefix.BeginsWith(
"https://"))) {
755 tmpDir = tmpDirPrefix.Data();
757 if (tmpDir.empty()) tmpDir =
"/tmp";
760 std::string jobDir = tmpDir +
"/.ndmspc/tmp/" + std::to_string(gSystem->GetPid());
764 const char * resultsDirEnv = gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR");
765 const bool sameDir = !resultsDirEnv || std::string(resultsDirEnv) == tmpDir;
766 std::string resultsDir = sameDir ? jobDir
767 : (std::string(resultsDirEnv) +
"/" +
768 std::to_string(gSystem->GetPid()));
770 std::string filePrefix = jobDir;
771 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
772 std::string filename = filePrefix +
"/" + std::to_string(i) +
"/" + storagePostfix;
773 bool rc = threadDataVector[i].Init(i, func, beginFunc, endFunc,
this, binningIn,
fInput, filename,
776 NLogError(
"Failed to initialize thread data %zu, exiting ...", i);
779 threadDataVector[i].SetCfg(cfg);
784 std::string resultsFile = resultsDir +
"/" + std::to_string(i) +
"/" + storagePostfix;
785 threadDataVector[i].SetResultsFilename(resultsFile);
788 size_t processedEntries = 0;
789 size_t totalEntries = 0;
790 auto start_par = std::chrono::high_resolution_clock::now();
791 auto start_par_job = std::chrono::high_resolution_clock::now();
796 thread_obj.Process(coords);
799 size_t nRunning = (totalEntries - processedEntries >= threadDataVector.size()) ? threadDataVector.size()
800 : totalEntries - processedEntries;
801 NUtils::ProgressBar(processedEntries, totalEntries, start_par, TString::Format(
"R%4zu", nRunning).Data());
808 std::vector<Ndmspc::NThreadData *> processWorkers;
809 std::unique_ptr<Ndmspc::NDimensionalExecutor> ipcExecutor;
811 processWorkers.reserve(threadDataVector.size());
812 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
813 processWorkers.push_back(&threadDataVector[i]);
815 ipcExecutor = std::make_unique<Ndmspc::NDimensionalExecutor>(std::vector<int>{0}, std::vector<int>{0});
817 const char * tcpPort = gSystem->Getenv(
"NDMSPC_TCP_PORT");
818 std::string tcpEndpoint = std::string(
"tcp://0.0.0.0:") + (tcpPort ? tcpPort :
"5555");
819 const char * resultsDirBase = gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR");
820 const char * macroParams = gSystem->Getenv(
"NDMSPC_MACRO_PARAMS");
824 if (workerMacro.empty()) {
825 if (
const char * envMacro = gSystem->Getenv(
"NDMSPC_MACRO")) workerMacro = envMacro;
827 ipcExecutor->StartProcessIpc(processWorkers, nProcesses, tcpEndpoint, resultsDir,
829 resultsDirBase ? resultsDirBase :
"",
830 macroParams ? macroParams :
"");
832 ipcExecutor->StartProcessIpc(processWorkers, nProcesses);
836 std::map<std::string, std::vector<Long64_t>>
837 defIdMapProcessedRemoved;
850 for (
auto & name : defNames) {
853 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
857 if (binningDef->GetIds().size() == 0) {
858 NLogWarning(
"NGnTree::Process: Binning definition '%s' has no entries, skipping ...", name.c_str());
862 const std::vector<Long64_t> originalDefinitionIds = binningDef->GetIds();
864 std::vector<int> mins, maxs;
866 maxs.push_back(binningDef->GetIds().size() - 1);
867 NLogDebug(
"NGnTree::Process: Processing with binning definition '%s' with %zu entries", name.c_str(),
868 binningDef->GetIds().size());
871 NLogInfo(
"NGnTree::Process: Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
874 Printf(
"Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
876 start_par = std::chrono::high_resolution_clock::now();
877 processedEntries = 0;
878 totalEntries = maxs[0] + 1;
879 const size_t activeWorkers = useProcessIpc ? std::max<size_t>(1, std::min(nProcesses, processWorkers.size()))
880 : threadDataVector.size();
883 TString::Format(
"R%4zu", activeWorkers).Data());
886 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
888 threadDataVector[i].GetHnSparseBase()->GetBinning()->SetCurrentDefinitionName(name);
893 if (!useProcessIpc) {
897 Bool_t prevMustClean = gROOT->MustClean();
898 gROOT->SetMustClean(kFALSE);
905 Bool_t prevBatch = gROOT->IsBatch();
906 gROOT->SetBatch(kTRUE);
915 gROOT->SetMustClean(prevMustClean);
916 gROOT->SetBatch(prevBatch);
919 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
923 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
924 threadDataVector[i].ExecuteEndFunction();
928 ipcExecutor->SetBounds(mins, maxs);
932 size_t finalActiveWorkers = 0;
933 size_t acked = ipcExecutor->ExecuteCurrentBoundsProcessIpc(
934 name, &originalDefinitionIds,
939 size_t nRunning = std::min(progress.
activeWorkers, activeWorkers);
941 TString::Format(
"R%4zu", nRunning).Data());
944 processedEntries = acked;
950 size_t connected = finalActiveWorkers > 0 ? finalActiveWorkers : std::min(nProcesses, processWorkers.size());
951 const size_t processesToUse = std::max<size_t>(1, std::min(nProcesses, connected));
952 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
953 auto * workerDef = threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name);
955 workerDef->GetIds().clear();
959 for (
size_t taskIndex = 0; taskIndex < originalDefinitionIds.size(); ++taskIndex) {
960 const size_t workerIndex = taskIndex % processesToUse;
961 threadDataVector[workerIndex].SetNProcessed(threadDataVector[workerIndex].GetNProcessed() + 1);
963 auto * workerDef = threadDataVector[workerIndex].GetHnSparseBase()->GetBinning()->GetDefinition(name);
965 workerDef->GetIds().push_back(originalDefinitionIds[taskIndex]);
975 Printf(
"Finished processing binning definition '%s'. Post-processing results ...", name.c_str());
977 NLogDebug(
"NGnTree::Process: [BEGIN] ------------------------------------------------");
980 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
981 NLogDebug(
"NGnTree::Process: -> Thread %zu processed %lld entries", i, threadDataVector[i].GetNProcessed());
985 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().begin(),
986 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().end());
991 for (
size_t i = 0; i < defNames.size(); i++) {
993 std::string other_name = defNames[i];
999 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", other_name.c_str());
1003 for (
auto it = otherDef->GetIds().begin(); it != otherDef->GetIds().end();) {
1004 NLogTrace(
"NGnTree::Process: Checking entry %lld from definition '%s' against sumIds=%d", *it,
1005 other_name.c_str(), sumIds);
1007 NLogTrace(
"NGnTree::Process: Removing entry %lld from definition '%s'", *it, other_name.c_str());
1008 defIdMapProcessedRemoved[other_name].push_back(*it);
1009 it = otherDef->GetIds().erase(it);
1021 NLogDebug(
"NGnTree::Process: [END] ------------------------------------------------");
1024 catch (
const std::exception & ex) {
1026 ipcExecutor->FinishProcessIpc(
true);
1027 ipcExecutor.reset();
1030 TString what(ex.what());
1031 if (what.Contains(
"Interrupted by user")) {
1033 gROOT->SetInterrupt(kFALSE);
1035 NLogWarning(
"NGnTree::Process: Interrupted by user, stopping processing.");
1038 NLogError(
"NGnTree::Process: Processing failed: %s", ex.what());
1045 auto end_par = std::chrono::high_resolution_clock::now();
1046 std::chrono::duration<double, std::milli> par_duration = end_par - start_par_job;
1049 ipcExecutor->FinishProcessIpc();
1054 const std::set<size_t> registeredWorkers =
1055 (ipcExecutor && useTcp) ? ipcExecutor->GetRegisteredWorkerIndices() : std::set<size_t>{};
1056 const bool filterByRegistered = !registeredWorkers.empty();
1059 Printf(
"NGnTree::Process: Execution completed and it took %s .",
1063 NLogInfo(
"NGnTree::Process: Execution completed and it took %s .",
1067 NLogInfo(
"NGnTree::Process: Post processing %zu results ...", threadDataVector.size());
1068 for (
auto & data : threadDataVector) {
1069 if (useProcessIpc) {
1070 NLogTrace(
"NGnTree::Process: Releasing parent handle for worker %zu file without writing",
1071 data.GetAssignedIndex());
1075 NLogTrace(
"NGnTree::Process: Closing file from thread %zu with write", data.GetAssignedIndex());
1076 data.GetHnSparseBase()->GetStorageTree()->Close(
true);
1080 NLogDebug(
"NGnTree::Process: Merging %zu results ...", threadDataVector.size());
1082 Printf(
"NGnTree::Process: [phase] merge start (%zu workers)", threadDataVector.size());
1084 const auto mergeStart = std::chrono::high_resolution_clock::now();
1085 TList * mergeList =
new TList();
1087 outputData->
Init(0, func,
nullptr,
nullptr,
this, binningIn);
1091 for (
auto & data : threadDataVector) {
1092 if (filterByRegistered && registeredWorkers.find(data.GetAssignedIndex()) == registeredWorkers.end()) {
1093 NLogInfo(
"NGnTree::Process: Skipping worker %zu — never connected", data.GetAssignedIndex());
1096 NLogTrace(
"NGnTree::Process: Adding thread data %zu to merge list ...", data.GetAssignedIndex());
1097 mergeList->Add(&data);
1100 Long64_t nmerged = outputData->
Merge(mergeList);
1101 const auto mergeEnd = std::chrono::high_resolution_clock::now();
1103 const auto mergeSec = std::chrono::duration_cast<std::chrono::duration<double>>(mergeEnd - mergeStart).count();
1104 Printf(
"NGnTree::Process: [phase] merge done (%lld outputs, %.2f s)", nmerged, mergeSec);
1107 NLogError(
"NGnTree::Process: Failed to merge thread data, exiting ...");
1111 NLogInfo(
"NGnTree::Process: Merged %lld outputs successfully", nmerged);
1123 std::set<Long64_t> mergedContentIds;
1124 std::vector<std::pair<Long64_t, std::vector<int>>> mergedContentCoords;
1127 for (
size_t i = 0; i < defNames.size(); i++) {
1128 std::string name = defNames[i];
1132 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1135 for (
auto & [other_name, removedIds] : defIdMapProcessedRemoved) {
1136 if (other_name.compare(name) != 0) {
1139 for (
auto &
id : removedIds) {
1140 if (std::find(def->GetIds().begin(), def->GetIds().end(),
id) == def->GetIds().end()) {
1141 NLogTrace(
"NGnTree::Process: Adding missing entry %lld to definition '%s'",
id, name.c_str());
1142 def->GetIds().push_back(
id);
1146 sort(def->GetIds().begin(), def->GetIds().end());
1150 def->GetContent()->Reset();
1151 for (
auto id : def->GetIds()) {
1153 def->GetBinning()->GetContent()->GetBinContent(
id, point.
GetCoords());
1156 NLogTrace(
"NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld", name.c_str(),
id, bin);
1157 def->GetContent()->SetBinContent(bin,
id);
1159 if (mergedContentIds.insert(
id).second) {
1160 mergedContentCoords.emplace_back(
1169 mergedBinning->GetContent()->Reset();
1170 for (
const auto & entry : mergedContentCoords) {
1171 Long64_t bin = mergedBinning->GetContent()->GetBin(entry.second.data());
1172 mergedBinning->GetContent()->SetBinContent(bin, entry.first);
1176 NLogDebug(
"NGnTree::Process: Final binning definitions after processing:");
1177 for (
auto & name : defNames) {
1181 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1184 binningDef->Print();
1193 NLogInfo(
"NGnTree::Process: Processing completed successfully. Output was stored in '%s'.",
1202 Printf(
"NGnTree::Process: [phase] final close start (%s)",
1205 const auto closeStart = std::chrono::high_resolution_clock::now();
1207 const auto closeEnd = std::chrono::high_resolution_clock::now();
1209 const auto closeSec = std::chrono::duration_cast<std::chrono::duration<double>>(closeEnd - closeStart).count();
1210 Printf(
"NGnTree::Process: [phase] final close done (%.2f s)", closeSec);
1214 Printf(
"NGnTree::Process: [phase] cleanup start (%s)", jobDir.c_str());
1216 const auto cleanupStart = std::chrono::high_resolution_clock::now();
1217 gSystem->Exec(TString::Format(
"rm -fr %s", jobDir.c_str()));
1218 const auto cleanupEnd = std::chrono::high_resolution_clock::now();
1220 const auto cleanupSec = std::chrono::duration_cast<std::chrono::duration<double>>(cleanupEnd - cleanupStart).count();
1221 Printf(
"NGnTree::Process: [phase] cleanup done (%.2f s)", cleanupSec);
1223 gROOT->SetBatch(batch);
1238 fOutputs[name]->SetName(name.c_str());
1249 NLogDebug(
"Opening '%s' with branches='%s' and treename='%s' ...", filename.c_str(), branches.c_str(),
1252 TFile * file = TFile::Open(filename.c_str());
1254 NLogError(
"NGnTree::Open: Cannot open file '%s'", filename.c_str());
1258 TTree * tree = (TTree *)file->Get(treename.c_str());
1260 NLogError(
"NGnTree::Open: Cannot get tree '%s' from file '%s'", treename.c_str(), filename.c_str());
1264 return Open(tree, branches, file);
1275 NLogError(
"NGnTree::Open: Cannot get binning from tree '%s'", tree->GetName());
1279 if (!hnstStorageTree) {
1280 NLogError(
"NGnTree::Open: Cannot get tree storage info from tree '%s'", tree->GetName());
1284 std::map<std::string, TList *> outputs;
1285 TDirectory * dir =
nullptr;
1287 dir = (TDirectory *)file->Get(
"outputs");
1288 auto l = dir->GetListOfKeys();
1289 for (
auto kv : *l) {
1290 TObject * obj = dir->Get(kv->GetName());
1292 TList * l =
dynamic_cast<TList *
>(obj);
1294 outputs[l->GetName()] = l;
1295 NLogDebug(
"Imported output list for binning '%s' with %d object(s) from file '%s'", l->GetName(), l->GetEntries(),
1306 if (!hnstStorageTree->
SetFileTree(file, tree))
return nullptr;
1310 std::vector<std::string> enabledBranches;
1311 if (!branches.empty()) {
1319 NLogTrace(
"NGnTree::Open: Enabled branches: %s", kv.first.c_str());
1340 NLogTrace(
"NGnTree::SetNavigator: Replacing existing navigator ...");
1354 NLogError(
"NGnTree::Close: Storage tree is not initialized in NGnTree !!!");
1367 NLogError(
"NGnTree::GetEntry: Storage tree is not initialized in NGnTree !!!");
1382 return GetEntry(0, checkBinningDef);
1385 void NGnTree::Play(
int timeout, std::string binning, std::vector<int> outputPointIds,
1386 std::vector<std::vector<int>> ranges, Option_t * option)
1391 TString opt = option;
1394 std::string annimationTempDir =
1395 TString::Format(
"%s/.ndmspc/animation/%d", gSystem->Getenv(
"HOME"), gSystem->GetPid()).Data();
1396 gSystem->Exec(TString::Format(
"mkdir -p %s", annimationTempDir.c_str()));
1398 if (binning.empty()) {
1404 NLogError(
"NGnTree::Play: Binning definition '%s' not found in NGnTree !!!", binning.c_str());
1405 NLogError(
"Available binning definitions:");
1408 NLogError(
" [*] %s", name.c_str());
1410 NLogError(
" [ ] %s", name.c_str());
1415 THnSparse * bdContent = (THnSparse *)binningDef->
GetContent()->Clone();
1417 std::string bdContentName = TString::Format(
"bdContent_%s", binning.c_str()).Data();
1421 Long64_t linBin = 0;
1422 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{bdContent->CreateIter(
true )};
1423 std::vector<Long64_t> ids;
1425 while ((linBin = iter->Next()) >= 0) {
1427 ids.push_back(linBin);
1430 NLogWarning(
"NGnTree::Play: No entries found in binning definition '%s' !!!", binning.c_str());
1435 TCanvas * c1 =
nullptr;
1437 c1 = (TCanvas *)gROOT->GetListOfCanvases()->FindObject(
"c1");
1438 if (c1 ==
nullptr) c1 =
new TCanvas(
"c1",
"NGnTree::Play", 800, 600);
1441 c1->DivideSquare(outputPointIds.size() > 0 ? outputPointIds.size() + 1 : 1);
1442 gSystem->ProcessEvents();
1444 binningDef->
Print();
1448 for (
auto id : ids) {
1453 if (!l || l->IsEmpty()) {
1454 NLogWarning(
"NGnTree::Play: No 'outputPoint' for entry %lld !!!",
id);
1461 if (outputPointIds.empty()) {
1462 outputPointIds.resize(l->GetEntries());
1463 for (
int i = 0; i < l->GetEntries(); i++) {
1464 outputPointIds[i] = i;
1467 int n = outputPointIds.size();
1470 for (
int i = 0; i < n; i++) {
1474 TObject * obj = l->At(outputPointIds[i]);
1476 if (obj->InheritsFrom(TH1::Class())) {
1477 TH1 * h = (TH1 *)obj;
1478 h->SetDirectory(
nullptr);
1482 TH1 * hclone = (TH1 *)h->Clone();
1484 hclone->SetDirectory(
nullptr);
1490 if (obj->InheritsFrom(TH1::Class()) && i == 0) {
1491 TH1 * h = (TH1 *)obj;
1493 NLogDebug(
"Mean value from histogram [%s]: %f", h->GetName(), v);
1498 TH1 * bdProj = (TH1 *)gROOT->FindObjectAny(
"bdProj");
1503 if (bdContent->GetNdimensions() == 1) {
1504 bdProj = bdContent->Projection(0,
"O");
1506 else if (bdContent->GetNdimensions() == 2) {
1507 bdProj = bdContent->Projection(0, 1,
"O");
1509 else if (bdContent->GetNdimensions() == 3) {
1510 bdProj = bdContent->Projection(0, 1, 2,
"O");
1513 NLogError(
"NGnTree::Play: Cannot project THnSparse with %d dimensions", bdContent->GetNdimensions());
1516 bdProj->SetName(
"bdProj");
1517 bdProj->SetTitle(TString::Format(
"Binning '%s' content projection", binning.c_str()).Data());
1518 bdProj->SetMinimum(0);
1520 bdProj->Draw(
"colz");
1525 c1->ModifiedUpdate();
1526 c1->SaveAs(TString::Format(
"%s/ndmspc_play_%06lld.png", annimationTempDir.c_str(), bdContent->GetNbins()).Data());
1528 gSystem->ProcessEvents();
1529 if (timeout > 0) gSystem->Sleep(timeout);
1533 NLogInfo(
"Creating animation gif from %s/ndmspc_play_*.png ...", annimationTempDir.c_str());
1535 TString::Format(
"magick -delay 20 -loop 0 %s/ndmspc_play_*.png ndmspc_play.gif", annimationTempDir.c_str()));
1536 gSystem->Exec(TString::Format(
"rm -fr %s", annimationTempDir.c_str()));
1537 NLogInfo(
"Animation saved to ndmspc_play.gif");
1553 TH1::AddDirectory(kFALSE);
1555 json cfg = point->
GetCfg();
1557 Printf(
"Processing THnSparse projection with configuration: %s", cfg.dump().c_str());
1565 for (
auto & [objName, objCfg] : cfg[
"objects"].items()) {
1566 NLogInfo(
"Processing object '%s' ...", objName.c_str());
1569 if (hns ==
nullptr) {
1570 NLogError(
"NGnTree::Projection: THnSparse 'hns' not found in storage tree !!!");
1575 for (
size_t i = 0; i < objCfg.size(); i++) {
1577 NLogInfo(
"Processing projection %zu for object '%s' ...", i, objName.c_str());
1578 std::vector<int> dims;
1579 std::vector<std::string> dimNames = cfg[
"objects"][objName][i].get<std::vector<std::string>>();
1580 for (
const auto & dimName : dimNames) {
1581 NLogDebug(
"Looking for dimension name '%s' in THnSparse ...", dimName.c_str());
1583 for (
int i = 0; i < hns->GetNdimensions(); i++) {
1584 if (dimName == hns->GetAxis(i)->GetName()) {
1590 dims.push_back(dim);
1592 NLogError(
"NGnTree::Projection: Dimension name '%s' not found in THnSparse !!!", dimName.c_str());
1597 TH1 * hPrev = (TH1 *)output->At(i);
1599 hProj->SetName(TString::Format(
"%s_proj_%s", objName.c_str(),
NUtils::Join(dims,
'_').c_str()).Data());
1613 THnSparse * hnsIn = binningDef->
GetContent();
1615 std::vector<std::vector<int>> ranges = cfg[
"ranges"].get<std::vector<std::vector<int>>>();
1617 Long64_t linBin = 0;
1618 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{hnsIn->CreateIter(
true )};
1619 std::vector<Long64_t> ids;
1621 while ((linBin = iter->Next()) >= 0) {
1622 ids.push_back(linBin);
1625 NLogWarning(
"NGnTree::Projection: No entries found in binning definition '%s' !!!", binningDef->GetName());
1630 binningDef->
GetIds() = ids;
1645 std::map<
int, std::vector<int>> ranges, std::map<
int, std::vector<int>> rangesBase)
1654 return navigator.
Reshape(binningName, levels, level, ranges, rangesBase);
1658 int level, std::map<
int, std::vector<int>> ranges,
1659 std::map<
int, std::vector<int>> rangesBase)
1665 if (binningName.empty()) {
1669 THnSparse * hns = (THnSparse *)
fOutputs[binningName]->FindObject(
"resource_monitor");
1671 NLogError(
"NGnTree::Draw: Resource monitor THnSparse not found in outputs !!!");
1677 auto ngnt =
new NGnTree(hns,
"stat",
"/tmp/hnst_imported_for_drawing.root");
1678 if (ngnt->IsZombie()) {
1679 NLogError(
"NGnTree::GetResourceStatisticsNavigator: Failed to import resource monitor THnSparse !!!");
1686 auto ngnt2 =
NGnTree::Open(
"/tmp/hnst_imported_for_drawing.root");
1687 auto nav = ngnt2->Reshape(
"default", levels, level, ranges, rangesBase);
1700 NLogTrace(
"NGnTree::InitParameters: Replacing existing parameters ...");
1704 if (paramNames.empty()) {
1705 NLogTrace(
"NGnTree::InitParameters: No parameter names provided, skipping ...");
1715 const std::vector<std::string> & headers,
const std::string & outputFile,
bool close)
1722 std::string findPathClean = findPath;
1723 if (!findPathClean.empty() && findPathClean.back() ==
'/') {
1724 findPathClean.pop_back();
1727 std::vector<std::string> paths =
NUtils::Find(findPathClean, fileName);
1728 NLogInfo(
"NGnTree::Import: Found %zu files to import ...", paths.size());
1731 int nDirAxes = ngntArray->GetEntries();
1736 ngntArray->Add(axis->Clone());
1738 ngntFirst->
Close(
false);
1740 std::map<std::string, std::vector<std::vector<int>>> b;
1742 for (
int i = 0; i < ngntArray->GetEntries(); i++) {
1743 TAxis * axis = (TAxis *)ngntArray->At(i);
1744 b[axis->GetName()].push_back({1});
1754 cfg[
"basedir"] = findPathClean;
1755 cfg[
"filename"] = fileName;
1756 cfg[
"nDirAxes"] = nDirAxes;
1757 cfg[
"headers"] = headers;
1759 Ndmspc::NGnProcessFuncPtr processFunc = [](
Ndmspc::NBinningPoint * point, TList * , TList * outputPoint,
1763 json cfg = point->
GetCfg();
1764 std::string filename = cfg[
"basedir"].get<std::string>();
1766 for (
auto & header : cfg[
"headers"]) {
1767 filename += point->
GetBinLabel(header.get<std::string>());
1774 filename += cfg[
"filename"].get<std::string>();
1776 if (!ngnt || filename.compare(ngnt->GetStorageTree()->GetFileName()) != 0) {
1777 NLogInfo(
"NGnTree::Import: Opening file '%s' ...", filename.c_str());
1779 NLogDebug(
"NGnTree::Import: Closing previously opened file '%s' ...",
1780 ngnt->GetStorageTree()->GetFileName().c_str());
1786 if (!ngnt || ngnt->IsZombie()) {
1787 NLogError(
"NGnTree::Import: Cannot open file '%s'", filename.c_str());
1793 int nDirAxes = cfg[
"nDirAxes"].get<
int>();
1796 NLogInfo(
"NGnTree::Import: Processing point with coords %s ...", coordsStr.c_str());
1798 Long64_t entryNumber =
1799 ngnt->GetBinning()->GetContent()->GetBin(&coords[3 * nDirAxes], kFALSE);
1800 NLogInfo(
"NGnTree::Import: Corresponding entry number in file '%s' is %lld", filename.c_str(), entryNumber);
1802 ngnt->GetEntry(entryNumber);
1811 for (
const auto & kv : ngnt->GetStorageTree()->GetBranchesMap()) {
1814 NLogTrace(
"NGnTree::Import: Adding branch '%s' to storage tree ...", kv.first.c_str());
1817 NLogTrace(
"NGnTree::Import: Setting branch address for branch '%s' ...", kv.first.c_str());
1820 outputPoint->Add(
new TNamed(
"source_file", filename));
1836 TH1::AddDirectory(kFALSE);
1849 ngnt->
Process(processFunc, cfg,
"", beginFunc, endFunc);
Defines binning mapping and content for NDMSPC histograms.
THnSparse * GetContent() const
Get the template content histogram.
virtual void Print(Option_t *option="") const
Print binning definition information.
void RefreshIdsFromContent()
Refresh bin IDs from content histogram.
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Represents a single point in multi-dimensional binning.
Double_t GetBinMax(std::string axis) const
Get the maximum value for a specific axis.
std::string GetString(const std::string &prefix="", bool all=false) const
Returns a string representation of the binning point.
void SetTreeStorage(NStorageTree *s)
Set storage tree object pointer.
std::string GetBinLabel(std::string axis) const
Get the label for a specific axis.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
Long64_t GetEntryNumber() const
Get entry number for the point.
NParameters * GetParameters() const
Get the parameters associated with this binning point.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
Double_t GetBinCenter(std::string axis) const
Returns the center value of the bin along the specified axis.
NStorageTree * GetTreeStorage() const
Get pointer to storage tree object.
void SetTempObject(const std::string &name, TObject *obj)
Set a temporary object with the given name.
NStorageTree * GetStorageTree() const
Returns a pointer to the associated storage tree.
virtual void Print(Option_t *option="") const
Print binning point information.
TObject * GetTempObject(const std::string &name) const
Retrieve a temporary object by name.
NGnTree * GetInput() const
Get pointer to input NGnTree object.
Int_t GetBin(std::string axis) const
Returns the bin index for the specified axis.
Int_t * GetCoords() const
Get pointer to content coordinates array.
json & GetCfg()
Get reference to configuration JSON object.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
Double_t GetBinMin(std::string axis) const
Get the minimum value for a specific axis.
NBinning object for managing multi-dimensional binning and axis definitions.
NBinningDef * GetDefinition(const std::string &name="")
Get binning definition by name.
std::vector< std::string > GetDefinitionNames() const
Get all definition names.
std::string GetCurrentDefinitionName() const
Get current definition name.
NBinningPoint * GetPoint()
Get the current binning point.
virtual void Print(Option_t *option="") const
Print binning information.
std::vector< TAxis * > GetAxes() const
Get vector of axis pointers.
bool SetCfg(const json &cfg)
Set configuration from JSON.
void AddBinningDefinition(std::string name, std::map< std::string, std::vector< std::vector< int >>> binning, bool forceDefault=false)
Add a binning definition.
void Reset()
Reset the binning object to initial state.
Executes a function over all points in an N-dimensional space, optionally in parallel.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
Navigator object for managing hierarchical data structures and projections.
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int >> levels, size_t level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Reshape navigator using binning name and levels.
void SetGnTree(NGnTree *tree)
Set NGnTree object pointer.
Thread-local data object for NDMSPC processing.
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
void SetResultsFilename(const std::string &filename)
Set the results filename for TCP mode (shared filesystem path).
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
void FlushDeferredDeletes()
Delete deferred ROOT objects, skipping TCanvas/TPad (leaked safely).
void SetCfg(const json &cfg)
Set configuration JSON object.
NDMSPC tree object for managing multi-dimensional data storage and processing.
NBinning * GetBinning() const
Get pointer to binning object.
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
virtual void Draw(Option_t *option="") override
Draws the tree object.
void SetIsPureCopy(bool val)
Sets the pure copy status of the tree.
bool Close(bool write=false)
Close the tree, optionally writing data.
bool fOwnsTreeStorage
True when fTreeStorage is owned by this instance.
virtual ~NGnTree()
Destructor.
Int_t GetEntry(Long64_t entry, bool checkBinningDef=true)
Get entry by index.
virtual void Print(Option_t *option="") const override
Print tree information.
void SetInput(NGnTree *input)
Set input NGnTree pointer.
NGnTree()
Default constructor.
static NGnTree * Import(const std::string &findPath, const std::string &fileName, const std::vector< std::string > &headers, const std::string &outFileName="/tmp/ngnt_imported.root", bool close=true)
Imports an NGnTree from a specified file.
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int >> levels, int level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Reshape navigator using binning name and levels.
NBinning * fBinning
Binning object.
TList * GetOutput(std::string name="")
Get output list by name.
NParameters * GetParameters() const
Returns the parameters associated with this tree.
NGnTree * fInput
Input NGnTree for processing.
bool fOwnsBinning
True when fBinning is owned by this instance.
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
NGnTree * GetInput() const
Get pointer to input NGnTree.
std::string fWorkerMacroList
Comma-separated macro paths sent to TCP workers.
void Play(int timeout=0, std::string binning="", std::vector< int > outputPointIds={0}, std::vector< std::vector< int >> ranges={}, Option_t *option="")
Play tree data with optional binning and output point IDs.
void SetOutputs(std::map< std::string, TList * > outputs)
Set outputs map.
bool InitParameters(const std::vector< std::string > ¶mNames)
Initializes the parameters for the tree using the provided parameter names.
std::map< std::string, TList * > fOutputs
Outputs.
NGnNavigator * fNavigator
! Navigator object
NParameters * fParameters
Parameters object.
TList * Projection(const json &cfg, std::string binningName="")
Project tree data using configuration and binning name.
NGnNavigator * GetResourceStatisticsNavigator(std::string binningName, std::vector< std::vector< int >> levels, int level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Returns a navigator for resource statistics based on binning and levels.
NStorageTree * fTreeStorage
Tree storage.
static std::string BuildObjectPath(const json &cfg, const json &objCfg, const NBinningPoint *point)
Helper: build object path string from configuration and a binning point.
void SetNavigator(NGnNavigator *navigator)
Sets the navigator for this tree.
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
bool Process(NGnProcessFuncPtr func, const json &cfg=json::object(), std::string binningName="", NGnBeginFuncPtr beginFunc=nullptr, NGnEndFuncPtr endFunc=nullptr)
Process tree data using a function pointer and configuration.
static bool GetConsoleOutput()
Get console output flag.
bool SetParameter(int bin, Double_t value, Double_t error=0.)
Set the value and error of a parameter by bin index.
NDMSPC storage tree object for managing ROOT TTree-based data storage.
void SetBranchAddresses()
Set addresses for all branches.
void SetEnabledBranches(std::vector< std::string > branches, int status=1)
Set enabled/disabled status for branches.
bool SetFileTree(TFile *file, TTree *tree)
Tree handling.
std::string GetFileName() const
Get file name.
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
Long64_t GetEntry(Long64_t entry, NBinningPoint *point=nullptr, bool checkBinningDef=false)
Get entry by index and fill NBinningPoint.
std::string GetPrefix() const
Get prefix path.
TObject * GetBranchObject(const std::string &name)
Get pointer to branch object by name.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
bool InitTree(const std::string &filename="", const std::string &treename="ngnt")
Initialize tree from file and tree name.
std::string GetPostfix() const
Get postfix path.
bool Close(bool write=false, std::map< std::string, TList * > outputs={})
Close the storage tree, optionally writing outputs.
TTree * GetTree() const
Get pointer to TTree object.
virtual void Print(Option_t *option="") const
Print storage tree information.
void SetBinning(NBinning *binning)
Set binning object pointer.
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
TObject * GetObject() const
Get object pointer.
static bool SetAxisRanges(THnSparse *sparse, std::vector< std::vector< int >> ranges={}, bool withOverflow=false, bool modifyTitle=false, bool reset=true)
Set axis ranges for THnSparse using vector of ranges.
static TH1 * ProjectTHnSparse(THnSparse *hns, const std::vector< int > &axes, Option_t *option="")
Project a THnSparse histogram onto specified axes.
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
static std::string FormatTime(long long seconds)
Format time in seconds to human-readable string.
static bool EnableMT(Int_t numthreads=-1)
Enable multi-threading with specified number of threads.
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
static void ProgressBar(int current, int total, std::string prefix="", std::string suffix="", int barWidth=50)
Display progress bar.
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.
static TObjArray * AxesFromDirectory(const std::vector< std::string > paths, const std::string &findPath, const std::string &fileName, const std::vector< std::string > &axesNames)
Creates an array of axes objects from files in specified directories.
static std::vector< std::string > Find(std::string path, std::string filename="")
Find files in a path matching filename.
Execution progress metrics for IPC-based distributed processing.
size_t activeWorkers
Number of workers currently active.
size_t tasksAcked
Tasks completed and ACKed by workers.