salsa 0.7.1
Loading...
Searching...
No Matches
NodeManager.hh
1#pragma once
2#include "Consumer.hh"
3#include "Feeder.hh"
4#include "Job.hh"
5#include "Object.hh"
6#include "Publisher.hh"
7#include "Socket.hh"
8#include "TaskPool.hh"
9#include "Worker.hh"
10#include "TaskInfo.pb.h"
11
12namespace Salsa {
21
22class NodeManager : public Object {
23public:
25 virtual ~NodeManager();
26
27 void print(std::string opt = "") const;
28
29 // TODO Annotate add ops
30 void addConsumer(std::string uuid, std::shared_ptr<Socket> s);
31 void addFeeder(std::string uuid, std::shared_ptr<Socket> s);
32 void addWorker(std::string uuid, std::shared_ptr<Socket> s);
33 void addTask(TaskInfo * taskInfo, std::string cuuid, std::string fuuid,
34 Salsa::Job::EQueueType t = Salsa::Job::pending);
35
36 // TODO Yet again...
37 virtual Socket * onEnter(std::string self, std::string fromType, Message * msg, std::vector<std::string> & out);
38 virtual Socket * onExit(std::string self, Message * msg, std::vector<std::string> & out);
39 virtual Socket * onWhisper(std::string self, Message * msg, std::vector<std::string> & out);
40
41 // TODO Return ops
42 std::shared_ptr<Feeder> feeder(std::string uuid) const;
43 std::shared_ptr<Consumer> consumer(std::string uuid) const;
44 std::shared_ptr<Worker> worker(std::string uuid) const;
45 Job * job(std::string uuid);
46
49
50 // TODO Annotate
51 virtual void addTaskSlot();
52 bool hasJobs() const;
53 virtual bool terminateFinishedJobs();
55 uint64_t finishedJobTimeout() const { return mFinishedJobTimeout; }
57 void finishedJobTimeout(uint64_t t) { mFinishedJobTimeout = t; }
58
59 int32_t nSlots(double mult = 1.0) const;
60 void jobs(std::string clientUUID, std::vector<std::string> & jobs) const; // TODO ehm, what?
61
62 // TODO Task ops
63 TaskInfo * getNextTask();
64 virtual void resultTask(TaskInfo * task);
66 virtual void resultTaskToExternal(Job *, TaskInfo *){};
67 virtual void noMoreTasks(std::string jobUUID);
68 virtual bool haveMoreTasks();
69 virtual bool haveMoreTasks(std::string jobUUID);
70
72 virtual void runTask(TaskState * ts, std::string wk, std::string upstream) = 0;
73
74 // TODO Terminate ops
75 virtual void terminateJob(std::string uuid);
76 virtual void terminateAllJobs(bool finishedonly = false);
77
78 virtual bool handleTaskPool(void * p);
79 virtual bool sendWhisper(Socket * s, std::string to, std::vector<std::string> & v);
80
81 // TODO annotate ops
82 virtual void publisher(Publisher * p);
83 virtual Publisher * publisher() const;
84 virtual bool publish(std::string id, bool force = false) const;
86 void clusterAlias(std::string n) { mClusterAlias = n; }
88 std::string clusterAlias() { return mClusterAlias; }
89
90protected:
91 std::string mClusterAlias{"local"};
92 std::map<std::string, Job *> mJobs{};
93 std::vector<std::string> mActiveJobs{};
94 std::vector<std::string> mFinishedJobs{};
96 std::map<std::string, std::shared_ptr<Worker>> mWorkers{};
97 std::map<std::string, std::shared_ptr<Consumer>> mConsumers{};
98 std::map<std::string, std::shared_ptr<Feeder>> mFeeders{};
99 TaskPool * mpTaskPool = nullptr;
100 Publisher * mpPublisher = nullptr;
101};
102} // namespace Salsa
Job class.
Definition Job.hh:16
EQueueType
Queue types.
Definition Job.hh:19
Base Message class.
Definition Message.hh:15
NodeManager class.
virtual void resultTaskToExternal(Job *, TaskInfo *)
Handle return of task and send it to external client.
Publisher * mpPublisher
Publisher.
virtual bool publish(std::string id, bool force=false) const
virtual bool terminateFinishedJobs()
std::vector< std::string > mActiveJobs
List of active jobs.
std::shared_ptr< Feeder > feeder(std::string uuid) const
std::string mClusterAlias
Cluster alias.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
std::string clusterAlias()
Returns Cluster alias.
virtual void terminateAllJobs(bool finishedonly=false)
std::shared_ptr< Worker > worker(std::string uuid) const
Job * job(std::string uuid)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
virtual bool haveMoreTasks()
TaskPool * taskPool()
Get NM's task pool.
virtual void terminateJob(std::string uuid)
virtual bool handleTaskPool(void *p)
void clusterAlias(std::string n)
Sets Cluster alias.
virtual Publisher * publisher() const
uint64_t mFinishedJobTimeout
Finished job timeout in seconds.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
std::shared_ptr< Consumer > consumer(std::string uuid) const
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
virtual void resultTask(TaskInfo *task)
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
std::map< std::string, Job * > mJobs
List of jobs.
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
bool hasJobs() const
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
virtual void runTask(TaskState *ts, std::string wk, std::string upstream)=0
Run task interface.
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
virtual void addTaskSlot()
std::vector< std::string > mFinishedJobs
List of finished jobs.
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
virtual ~NodeManager()
int32_t nSlots(double mult=1.0) const
virtual void noMoreTasks(std::string jobUUID)
TaskInfo * getNextTask()
uint64_t finishedJobTimeout() const
Returns finished job timeout.
TaskPool * mpTaskPool
Task pool.
void print(std::string opt="") const
void finishedJobTimeout(uint64_t t)
Sets finished job timeout.
Base Salsa Object class.
Definition Object.hh:15
Base Publisher class.
Definition Publisher.hh:14
Base Socket class.
Definition Socket.hh:15
Base salsa TaskPool class.
Definition TaskPool.hh:18
Base salsa TaskState class.
Definition TaskState.hh:16