3 #include <NodeManager.hh> 5 using namespace fmt::literals;
40 SPD_TRACE(
"::onExit inMSG [{}]", pInMsg->
uuid());
46 if (pInMsg->
uuid() == node.uuid()) {
50 slots += node.slots();
59 mpNodeInfo->mutable_hosts()->DeleteSubrange(iPos, 1);
63 SPD_WARN(
"WORKER [{}] exit. Moving task [{}:{}] to Job::pending", pInMsg->
uuid(), pTask->jobid(),
67 if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::running)) {
68 pJob->
moveTask(pTask->taskid(), Salsa::Job::running, Salsa::Job::pending);
70 else if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
71 pJob->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
74 if (!getenv(
"SALSA_FAST")) {
75 if (!pJob->consumer().empty()) {
77 std::vector<std::string> outData;
78 outData.push_back(
"JOBRESUBMITED");
80 pTask->SerializeToString(&payload);
81 outData.push_back(payload);
105 std::vector<std::string> inContent = pInMsg->
content();
107 SPD_TRACE(
"::onWhisper inMSG [{}]", inContent[0]);
108 if (inContent[0] ==
"FREESLOT") {
110 SPD_TRACE(
"Searching for task in one of jobs");
113 if (pTask ==
nullptr) {
114 SPD_TRACE(
"Sending back NOMORETASKS");
115 out.push_back(
"NOMORETASKS");
116 out.push_back(inContent[1]);
119 out.push_back(
"TASK");
121 pTask->SerializeToString(&payload);
122 out.push_back(payload);
123 out.push_back(inContent[1]);
125 SPD_TRACE(
"mWorkerTasks[{}] vector size [{}]", pInMsg->
uuid(),
mWorkerTasks[pInMsg->
uuid()].size());
128 else if (inContent[0] ==
"TASK_IS_RUNNING") {
129 SPD_TRACE(
"TASK_IS_RUNNING");
131 std::string payload = inContent[1];
132 TaskInfo * pTask =
new TaskInfo();
134 if (!pTask->ParseFromString(payload)) {
135 SPD_ERROR(
"Message does not contain ProtoBuf message!");
136 for (
auto s : inContent) {
137 SPD_ERROR(
"::onWhisper inMSG [{}]", s);
145 if (job->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
146 job->
moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::running);
151 std::vector<std::string> output;
152 output.push_back(inContent[0]);
153 output.push_back(inContent[1]);
162 else if (inContent[0] ==
"TASK_RESULT") {
163 std::string payload = inContent[1];
164 TaskInfo * pTask =
new TaskInfo();
166 if (!pTask->ParseFromString(payload)) {
167 SPD_ERROR(
"Message does not contain ProtoBuf message!");
168 for (
auto line : inContent) {
169 SPD_ERROR(
"::onWhisper inMSG [{}]", line);
181 else if (inContent[0] ==
"NODEINFO") {
182 std::string payload = inContent[1];
183 Salsa::NodeInfo * pNI =
mpNodeInfo->add_hosts();
184 if (!pNI->ParseFromString(payload)) {
185 SPD_ERROR(
"[NodeInfo] Message does not contain ProtoBuf message!");
203 SPD_INFO(
"Client [{}] started",
uuid);
204 SPD_TRACE(
"Feeders -> [{}]",
mClients.size());
206 std::vector<std::string> out;
207 out.push_back(
"SUB");
233 if (pTask->taskid() == pTaskInfo->taskid()) {
249 std::vector<Salsa::TaskInfo *> tasks;
250 job->
tasks(tasks, Job::pending,
false);
251 job->tasks(tasks, Job::assigned,
false);
252 job->tasks(tasks, Job::running,
false);
253 for (
auto pTask : tasks) {
254 SPD_TRACE(
"removeWorkerTask [{}]", pTask->taskid());
259 std::vector<std::string> out;
260 out.push_back(
"TERMINATEJOB");
264 SPD_INFO(
"JOB [{}] has finished",
uuid);
std::string mUUID
Self UUID.
NodeInfo * nodeInfo() const
std::map< std::string, std::vector< TaskInfo * > > mWorkerTasks
Worker tasks.
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
void consumer(std::string uuid)
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
void terminateJob(std::string uuid)
NodeManager * mpNodeManager
Node Manager.
virtual bool haveMoreTasks()
virtual void resultTask(TaskInfo *task)
void feeder(std::string uuid)
virtual std::string uuid() const =0
Returns node uuid.
Job * job(std::string uuid)
void subscribe(std::string uuid)
void print(std::string opt="") const
std::map< std::string, std::string > mClients
List of clients.
std::shared_ptr< Socket > pipe() const
TODO Returns distributor's pipe?
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
NodeInfo * mpNodeInfo
Node Info.
std::string uuid() const
Returns distributor's UUID.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
void removeWorkerTask(TaskInfo *pTI)
std::shared_ptr< Consumer > consumer(std::string uuid) const