2 #include <NStorageTree.h>
7 #include "NBinningPoint.h"
10 #include "NGnThreadData.h"
19 bool NGnThreadData::Init(
size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr funcBegin, NGnEndFuncPtr endFunc,
21 const std::string & treename)
29 TH1::AddDirectory(kFALSE);
39 if (ngnt ==
nullptr) {
40 NLogError(
"NGnThreadData::Init: NGnTree is nullptr !!!");
49 NLogError(
"NGnThreadData::Init: Binning Source is nullptr !!!");
59 NLogError(
"NGnThreadData::InitStorage: Binning is not set !!!");
64 NLogError(
"NGnThreadData::InitStorage: Storage tree is not set !!!");
76 ts->
InitTree(filename.empty() ? fn : filename, treename);
81 NLogTrace(
"NGnThreadData::Init: Adding branch '%s' to thread %zu", kv.first.c_str(),
id);
88 ts->
AddBranch(kv.first,
nullptr, kv.second.GetObjectClassName());
91 if (!b) ts->
AddBranch(
"_outputPoint",
nullptr,
"TList");
94 NLogTrace(
"NGnThreadData::Init: Setting parameters branch for thread %zu",
id);
96 if (!b) ts->
AddBranch(
"_params",
nullptr,
"Ndmspc::NParameters");
123 NLogTrace(
"NGnThreadData::Init: Setting input NGnTree for thread %zu '%s'",
id,
132 ExecuteBeginFunction();
142 TH1::AddDirectory(kFALSE);
152 static std::mutex sPadMutex;
153 std::lock_guard<std::mutex> lk(sPadMutex);
157 auto * c =
new TCanvas(cname, cname, 1, 1);
168 NLogError(
"NGnThreadData::Process: NGnTree is not set in NGnThreadData !!!");
173 NLogError(
"NGnThreadData::Process: Process function is not set in NGnThreadData !!!");
179 if (binningDef ==
nullptr) {
180 NLogError(
"NGnThreadData::Process: Binning definition is not set in NGnThreadData !!!");
192 NLogDebug(
"NGnThreadData::Process: [%zu] Skipping entry=%lld, because it was already process !!!",
221 TList * outputPoint =
new TList();
230 if (!point->
GetCfg()[
"_ndmspc"].is_null()) {
238 if (outputPoint->GetEntries() > 0) {
240 "NGnThreadData::Process: [%zu] Entry '%lld' was accepted. %s",
GetAssignedIndex(), entry,
248 Int_t bytes = ts->
Fill(point,
nullptr,
false, {},
false);
259 NLogTrace(
"NGnThreadData::Process: [%zu] Entry '%lld' Fill was done with 0 bytes. Skipping ...",
270 "NGnThreadData::Process: [%zu] Entry '%lld' No output %s. Skipping ...",
GetAssignedIndex(), entry,
281 NLogTrace(
"NGnThreadData::Process: [%zu] Cleaning output list with %d entries for entry '%lld' ...",
284 TObject * obj =
nullptr;
285 while ((obj = outputPoint->First())) {
286 outputPoint->Remove(obj);
293 void NGnThreadData::SetCurrentDefinitionName(
const std::string & name)
303 void NGnThreadData::SyncCurrentDefinitionIds(
const std::vector<Long64_t> & ids)
311 def->GetIds().clear();
328 Long64_t nmerged = 0;
330 NLogTrace(
"NGnThreadData::Merge: BEGIN ------------------------------------------------");
331 NLogTrace(
"NGnThreadData::Merge: Merging thread data from %zu threads ...", list->GetEntries());
334 std::map<std::string, TList *> listOutputs;
337 TList * listTreeStorage =
new TList();
339 for (
auto obj : *list) {
340 if (obj->IsA() == NGnThreadData::Class()) {
342 NLogDebug(
"NGnThreadData::Merge: Merging thread %zu processed %lld ...", hnsttd->
GetAssignedIndex(),
346 NLogError(
"NGnThreadData::Merge: Storage tree is not set in NGnTree !!!");
352 NLogTrace(
"NGnThreadData::Merge: Found output list '%s' with %d objects", kv.first.c_str(),
353 kv.second ? kv.second->GetEntries() : 0);
354 if (kv.second && !kv.second->IsEmpty()) {
355 NLogTrace(
"NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
356 kv.second->GetEntries());
357 if (listOutputs.find(kv.first) == listOutputs.end()) {
359 listOutputs[kv.first] =
new TList();
364 listOutputs[kv.first]->Add(kv.second);
376 const std::string mergeFilename =
380 NLogError(
"NGnThreadData::Merge: Failed to open NGnTree from file '%s' !!!", mergeFilename.c_str());
415 mergedDef->
GetIds().clear();
417 for (
auto id : targetBinningDef->
GetIds()) {
421 NLogTrace(
"NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld",name.c_str(),
id, bin);
427 NLogDebug(
"NGnThreadData::Merge: Total entries to merge: %lld", nmerged);
432 NLogError(
"NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
438 NLogTrace(
"NGnThreadData::Merge: Final IDs in definition '%s': %s", name.c_str(),
446 NLogTrace(
"NGnThreadData::Merge: Merging %d storage trees ...", listTreeStorage->GetEntries());
456 for (
auto & kv : listOutputs) {
457 if (kv.second && !kv.second->IsEmpty()) {
458 NLogTrace(
"NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
459 kv.second->GetEntries() + 1);
470 NLogError(
"NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
476 for (
auto id : binningDef->
GetIds()) {
479 binningDef->
GetContent()->SetBinContent(bin,
id);
481 NLogTrace(
"NGnThreadData::Merge: -> Setting content bin %lld to id %lld", bin,
id);
499 NLogTrace(
"NGnThreadData::Merge: END ------------------------------------------------");
507 void NGnThreadData::ExecuteBeginFunction()
514 void NGnThreadData::ExecuteEndFunction()
525 NLogTrace(
"NGnThreadData::FlushDeferredDeletes: [%zu] Deleting %zu deferred objects ...",
Defines binning mapping and content for NDMSPC histograms.
THnSparse * GetContent() const
Get the template content histogram.
Long64_t GetId(size_t index) const
Get bin ID at specified index.
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Represents a single point in multi-dimensional binning.
void SetTreeStorage(NStorageTree *s)
Set storage tree object pointer.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
void SetCfg(json cfg)
Set configuration JSON object.
Int_t * GetCoords() const
Get pointer to content coordinates array.
void SetParameters(NParameters *params)
Set the parameters for this binning point.
void SetInput(NGnTree *input)
Set input NGnTree object pointer.
virtual void Reset()
Reset the binning point to initial state.
json & GetCfg()
Get reference to configuration JSON object.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
NBinning object for managing multi-dimensional binning and axis definitions.
NBinningDef * GetDefinition(const std::string &name="")
Get binning definition by name.
std::vector< std::string > GetDefinitionNames() const
Get all definition names.
NBinningPoint * GetPoint()
Get the current binning point.
THnSparse * GetContent() const
Get the content histogram.
void SetCurrentDefinitionName(const std::string &name)
Set current definition name.
Thread-local data object for NDMSPC processing.
NGnEndFuncPtr fEndFunc
Function pointer to the end function.
NGnBeginFuncPtr fBeginFunc
Function pointer to the begin function.
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
NGnThreadData()
Constructor.
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
NBinning * fBiningSource
Pointer to the source binning (from the original NGnTree)
std::unordered_set< Long64_t > fProcessedBinIds
Set of already-processed global bin IDs (duplicate guard)
virtual void Process(const std::vector< int > &coords)
Process coordinates (virtual).
Long64_t GetNProcessed() const
Get number of processed entries.
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
virtual ~NGnThreadData()
Destructor.
json fCfg
Configuration object.
void FlushDeferredDeletes()
Delete deferred ROOT objects, skipping TCanvas/TPad (leaked safely).
std::vector< TObject * > fDeferredDeletes
Objects deferred for single-threaded deletion.
NGnTree * fHnSparseBase
Pointer to the base class.
Long64_t fNProcessed
Number of processed entries.
const std::string & GetResultsFilename() const
Get the results filename for TCP mode.
bool fIsPureCopy
Flag indicating pure copy mode.
NGnProcessFuncPtr fProcessFunc
Function pointer to the processing function.
NDMSPC tree object for managing multi-dimensional data storage and processing.
NBinning * GetBinning() const
Get pointer to binning object.
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
bool Close(bool write=false)
Close the tree, optionally writing data.
bool IsPureCopy() const
Checks if the tree is a pure copy.
Int_t GetEntry(Long64_t entry, bool checkBinningDef=true)
Get entry by index.
void SetInput(NGnTree *input)
Set input NGnTree pointer.
TList * GetOutput(std::string name="")
Get output list by name.
NParameters * GetParameters() const
Returns the parameters associated with this tree.
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
NGnTree * GetInput() const
Get pointer to input NGnTree.
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
Monitors and records resource usage (CPU, memory, wall time) for processes or threads.
void Start()
Records the starting resource usage and wall time.
void End()
Records the ending resource usage and wall time.
void Fill(Int_t *coords, int threadId)
Fills resource usage data into the histogram.
THnSparse * GetHnSparse() const
Returns the pointer to the THnSparse histogram.
THnSparse * Initialize(THnSparse *hns)
Initializes the THnSparse histogram for resource data.
NDMSPC storage tree object for managing ROOT TTree-based data storage.
Long64_t Merge(TCollection *list)
Merge storage trees from a collection.
void SetEnabledBranches(std::vector< std::string > branches, int status=1)
Set enabled/disabled status for branches.
virtual void Clear(Option_t *option="")
Clear storage tree data.
std::string GetFileName() const
Get file name.
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
std::vector< std::string > GetBrancheNames(bool onlyEnabled=false) const
Branches handling.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
bool InitTree(const std::string &filename="", const std::string &treename="ngnt")
Initialize tree from file and tree name.
Int_t Fill(NBinningPoint *point, NStorageTree *hnstIn=nullptr, bool ignoreFilledCheck=false, std::vector< std::vector< int >> ranges={}, bool useProjection=false)
Fill tree with NBinningPoint and optional input tree.
void SetBinning(NBinning *binning)
Set binning object pointer.
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
Thread-local data object for NDMSPC processing.
size_t GetAssignedIndex() const
Get the assigned index for the thread.
void SetAssignedIndex(size_t assignedIndex)
Set the assigned index for the thread.
NResourceMonitor * fResourceMonitor
Pointer to resource monitor.
void SetThreadId(std::thread::id threadId)
Set the thread's unique identifier.
NDMSPC tree branch object for managing ROOT TBranch and associated data.
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
static void SafeDeleteObjects(std::vector< TObject * > &objects)
Safely delete a vector of ROOT objects, bypassing GarbageCollect.
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.