2 #include <NStorageTree.h>
7 #include "NBinningPoint.h"
10 #include "NGnThreadData.h"
19 bool NGnThreadData::Init(
size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr funcBegin, NGnEndFuncPtr endFunc,
21 const std::string & treename)
29 TH1::AddDirectory(kFALSE);
39 if (ngnt ==
nullptr) {
40 NLogError(
"NGnThreadData::Init: NGnTree is nullptr !!!");
49 NLogError(
"NGnThreadData::Init: Binning Source is nullptr !!!");
59 NLogError(
"NGnThreadData::InitStorage: Binning is not set !!!");
64 NLogError(
"NGnThreadData::InitStorage: Storage tree is not set !!!");
76 ts->
InitTree(filename.empty() ? fn : filename, treename);
81 NLogTrace(
"NGnThreadData::Init: Adding branch '%s' to thread %zu", kv.first.c_str(),
id);
88 ts->
AddBranch(kv.first,
nullptr, kv.second.GetObjectClassName());
91 if (!b) ts->
AddBranch(
"_outputPoint",
nullptr,
"TList");
94 NLogTrace(
"NGnThreadData::Init: Setting parameters branch for thread %zu",
id);
96 if (!b) ts->
AddBranch(
"_params",
nullptr,
"Ndmspc::NParameters");
123 NLogTrace(
"NGnThreadData::Init: Setting input NGnTree for thread %zu '%s'",
id,
136 ExecuteBeginFunction();
146 TH1::AddDirectory(kFALSE);
156 static std::mutex sPadMutex;
157 std::lock_guard<std::mutex> lk(sPadMutex);
161 auto * c =
new TCanvas(cname, cname, 1, 1);
172 NLogError(
"NGnThreadData::Process: NGnTree is not set in NGnThreadData !!!");
177 NLogError(
"NGnThreadData::Process: Process function is not set in NGnThreadData !!!");
183 if (binningDef ==
nullptr) {
184 NLogError(
"NGnThreadData::Process: Binning definition is not set in NGnThreadData !!!");
195 if (entry < fHnSparseBase->GetBinning()->GetContent()->GetNbins()) {
199 NLogDebug(
"NGnThreadData::Process: [%zu] Skipping entry=%lld, because it was already process !!!",
227 TList * outputPoint =
new TList();
236 if (!point->
GetCfg()[
"_ndmspc"].is_null()) {
244 if (outputPoint->GetEntries() > 0) {
246 "NGnThreadData::Process: [%zu] Entry '%lld' was accepted. %s",
GetAssignedIndex(), entry,
254 Int_t bytes = ts->
Fill(point,
nullptr,
false, {},
false);
265 NLogTrace(
"NGnThreadData::Process: [%zu] Entry '%lld' Fill was done with 0 bytes. Skipping ...",
276 "NGnThreadData::Process: [%zu] Entry '%lld' No output %s. Skipping ...",
GetAssignedIndex(), entry,
287 NLogTrace(
"NGnThreadData::Process: [%zu] Cleaning output list with %d entries for entry '%lld' ...",
290 TObject * obj =
nullptr;
291 while ((obj = outputPoint->First())) {
292 outputPoint->Remove(obj);
304 Long64_t nmerged = 0;
306 NLogTrace(
"NGnThreadData::Merge: BEGIN ------------------------------------------------");
307 NLogTrace(
"NGnThreadData::Merge: Merging thread data from %zu threads ...", list->GetEntries());
310 std::map<std::string, TList *> listOutputs;
313 TList * listTreeStorage =
new TList();
315 for (
auto obj : *list) {
316 if (obj->IsA() == NGnThreadData::Class()) {
318 NLogDebug(
"NGnThreadData::Merge: Merging thread %zu processed %lld ...", hnsttd->
GetAssignedIndex(),
322 NLogError(
"NGnThreadData::Merge: Storage tree is not set in NGnTree !!!");
328 NLogTrace(
"NGnThreadData::Merge: Found output list '%s' with %d objects", kv.first.c_str(),
329 kv.second ? kv.second->GetEntries() : 0);
330 if (kv.second && !kv.second->IsEmpty()) {
331 NLogTrace(
"NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
332 kv.second->GetEntries());
333 if (listOutputs.find(kv.first) == listOutputs.end()) {
335 listOutputs[kv.first] =
new TList();
340 listOutputs[kv.first]->Add(kv.second);
354 NLogError(
"NGnThreadData::Merge: Failed to open NGnTree from file '%s' !!!", ts->
GetFileName().c_str());
372 for (
auto id : targetBinningDef->
GetIds()) {
376 NLogTrace(
"NGnThreadData::Merge: Adding def_id=%lld to content_bin=%lld",
id, bin);
385 NLogDebug(
"NGnThreadData::Merge: Total entries to merge: %lld", nmerged);
390 NLogError(
"NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
396 NLogTrace(
"NGnThreadData::Merge: Final IDs in definition '%s': %s", name.c_str(),
404 NLogTrace(
"NGnThreadData::Merge: Merging %d storage trees ...", listTreeStorage->GetEntries());
414 for (
auto & kv : listOutputs) {
415 if (kv.second && !kv.second->IsEmpty()) {
416 NLogTrace(
"NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
417 kv.second->GetEntries() + 1);
428 NLogError(
"NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
434 for (
auto id : binningDef->
GetIds()) {
437 binningDef->
GetContent()->SetBinContent(bin,
id);
439 NLogTrace(
"NGnThreadData::Merge: -> Setting content bin %lld to id %lld", bin,
id);
457 NLogTrace(
"NGnThreadData::Merge: END ------------------------------------------------");
465 void NGnThreadData::ExecuteBeginFunction()
472 void NGnThreadData::ExecuteEndFunction()
483 NLogTrace(
"NGnThreadData::FlushDeferredDeletes: [%zu] Deleting %zu deferred objects ...",
Defines binning mapping and content for NDMSPC histograms.
THnSparse * GetContent() const
Get the template content histogram.
Long64_t GetId(size_t index) const
Get bin ID at specified index.
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Represents a single point in multi-dimensional binning.
void SetTreeStorage(NStorageTree *s)
Set storage tree object pointer.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
void SetCfg(json cfg)
Set configuration JSON object.
Int_t * GetCoords() const
Get pointer to content coordinates array.
void SetParameters(NParameters *params)
Set the parameters for this binning point.
void SetInput(NGnTree *input)
Set input NGnTree object pointer.
virtual void Reset()
Reset the binning point to initial state.
json & GetCfg()
Get reference to configuration JSON object.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
NBinning object for managing multi-dimensional binning and axis definitions.
NBinningDef * GetDefinition(const std::string &name="")
Get binning definition by name.
std::vector< std::string > GetDefinitionNames() const
Get all definition names.
NBinningPoint * GetPoint()
Get the current binning point.
THnSparse * GetContent() const
Get the content histogram.
void SetCurrentDefinitionName(const std::string &name)
Set current definition name.
Thread-local data object for NDMSPC processing.
NGnEndFuncPtr fEndFunc
Function pointer to the end function.
NGnBeginFuncPtr fBeginFunc
Function pointer to the begin function.
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
NGnThreadData()
Constructor.
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
NBinning * fBiningSource
Pointer to the source binning (from the original NGnTree)
virtual void Process(const std::vector< int > &coords)
Process coordinates (virtual).
Long64_t GetNProcessed() const
Get number of processed entries.
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
virtual ~NGnThreadData()
Destructor.
json fCfg
Configuration object.
void FlushDeferredDeletes()
Delete deferred ROOT objects, skipping TCanvas/TPad (leaked safely).
std::vector< TObject * > fDeferredDeletes
Objects deferred for single-threaded deletion.
NGnTree * fHnSparseBase
Pointer to the base class.
Long64_t fNProcessed
Number of processed entries.
bool fIsPureCopy
Flag indicating pure copy mode.
NGnProcessFuncPtr fProcessFunc
Function pointer to the processing function.
NDMSPC tree object for managing multi-dimensional data storage and processing.
NBinning * GetBinning() const
Get pointer to binning object.
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
bool Close(bool write=false)
Close the tree, optionally writing data.
bool IsPureCopy() const
Checks if the tree is a pure copy.
Int_t GetEntry(Long64_t entry, bool checkBinningDef=true)
Get entry by index.
void SetInput(NGnTree *input)
Set input NGnTree pointer.
TList * GetOutput(std::string name="")
Get output list by name.
NParameters * GetParameters() const
Returns the parameters associated with this tree.
void SetWsClient(NWsClient *client)
Sets the NWsClient instance for this object.
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
NGnTree * GetInput() const
Get pointer to input NGnTree.
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
NWsClient * GetWsClient() const
Returns the associated NWsClient instance.
Monitors and records resource usage (CPU, memory, wall time) for processes or threads.
void Start()
Records the starting resource usage and wall time.
void End()
Records the ending resource usage and wall time.
void Fill(Int_t *coords, int threadId)
Fills resource usage data into the histogram.
THnSparse * GetHnSparse() const
Returns the pointer to the THnSparse histogram.
THnSparse * Initialize(THnSparse *hns)
Initializes the THnSparse histogram for resource data.
NDMSPC storage tree object for managing ROOT TTree-based data storage.
Long64_t Merge(TCollection *list)
Merge storage trees from a collection.
void SetEnabledBranches(std::vector< std::string > branches, int status=1)
Set enabled/disabled status for branches.
virtual void Clear(Option_t *option="")
Clear storage tree data.
std::string GetFileName() const
Get file name.
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
std::vector< std::string > GetBrancheNames(bool onlyEnabled=false) const
Branches handling.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
bool InitTree(const std::string &filename="", const std::string &treename="ngnt")
Initialize tree from file and tree name.
Int_t Fill(NBinningPoint *point, NStorageTree *hnstIn=nullptr, bool ignoreFilledCheck=false, std::vector< std::vector< int >> ranges={}, bool useProjection=false)
Fill tree with NBinningPoint and optional input tree.
void SetBinning(NBinning *binning)
Set binning object pointer.
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
Thread-local data object for NDMSPC processing.
size_t GetAssignedIndex() const
Get the assigned index for the thread.
void SetAssignedIndex(size_t assignedIndex)
Set the assigned index for the thread.
NResourceMonitor * fResourceMonitor
Pointer to resource monitor.
void SetThreadId(std::thread::id threadId)
Set the thread's unique identifier.
NDMSPC tree branch object for managing ROOT TBranch and associated data.
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
static void SafeDeleteObjects(std::vector< TObject * > &objects)
Safely delete a vector of ROOT objects, bypassing GarbageCollect.
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.