salsa  0.3.0
 All Classes Functions Variables Enumerations Pages
Feeder.cc
1 #include "Feeder.hh"
2 #include <Job.hh>
3 #include <NodeManager.hh>
4 
5 using namespace fmt::literals;
6 
7 namespace Salsa {
8 Feeder::Feeder(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * pNM) : Distributor(uuid, pipe, pNM)
9 {
13  mpNodeInfo->set_uuid(mUUID);
14 }
15 
17 {
21 }
22 
23 void Feeder::onEnter(Message * /*pInMsg*/, std::vector<std::string> & out, std::string type)
24 {
28 
29  if (type == "WORKER" && mpNodeManager->hasJobs()) {
30  out.push_back("SUB");
31  }
32 }
33 
34 void Feeder::onExit(Message * pInMsg, std::vector<std::string> & /*out*/)
35 {
39 
40  SPD_TRACE("::onExit inMSG [{}]", pInMsg->uuid());
41 
42  uint32_t slots = 0;
43  uint32_t iPos = 0;
44  bool found = false;
45  for (auto & node : mpNodeInfo->hosts()) {
46  if (pInMsg->uuid() == node.uuid()) {
47  found = true;
48  }
49  else {
50  slots += node.slots();
51  if (!found) {
52  iPos++;
53  }
54  }
55  }
56 
57  // TODO: Continue if worker (Better check is needed)
58  if (found) {
59  mpNodeInfo->mutable_hosts()->DeleteSubrange(iPos, 1);
60  mpNodeInfo->set_slots(slots);
61 
62  for (auto pTask : mWorkerTasks[pInMsg->uuid()]) {
63  SPD_WARN("WORKER [{}] exit. Moving task [{}:{}] to Job::pending", pInMsg->uuid(), pTask->jobid(),
64  pTask->taskid());
65  Job * pJob = mpNodeManager->job(pTask->jobid());
66  if (pJob) {
67  if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::running)) {
68  pJob->moveTask(pTask->taskid(), Salsa::Job::running, Salsa::Job::pending);
69  }
70  else if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
71  pJob->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
72  }
73 
74  if (!getenv("SALSA_FAST")) {
75  if (!pJob->consumer().empty()) {
76  std::shared_ptr<Consumer> pConsumer = mpNodeManager->consumer(pJob->consumer());
77  std::vector<std::string> outData;
78  outData.push_back("JOBRESUBMITED");
79  std::string payload;
80  pTask->SerializeToString(&payload);
81  outData.push_back(payload);
82  mpNodeManager->sendWhisper(pConsumer->pipe().get(), pJob->feeder(), outData);
83  }
84  }
85  }
86  }
87  mWorkerTasks[pInMsg->uuid()].clear();
88  mWorkerTasks.erase(pInMsg->uuid());
89  mClients.erase(pInMsg->uuid());
90 
92  subscribe(pInMsg->uuid());
93  }
95  SPD_INFO("Workers [{}] slots [{}]", mpNodeInfo->hosts_size(), mpNodeInfo->slots());
96  }
97 }
98 void Feeder::onWhisper(Message * pInMsg, std::vector<std::string> & out)
99 {
103 
104  std::vector<std::string> inContent = pInMsg->content();
105 
106  SPD_TRACE("::onWhisper inMSG [{}]", inContent[0]);
107  if (inContent[0] == "FREESLOT") {
108  SPD_TRACE("Searching for task in one of jobs");
109  TaskInfo * pTask = mpNodeManager->getNextTask();
110  if (pTask == nullptr) {
111  SPD_TRACE("Sending back NOMORETASKS");
112  out.push_back("NOMORETASKS");
113  out.push_back(inContent[1]);
114  }
115  else {
116  out.push_back("TASK");
117  std::string payload;
118  pTask->SerializeToString(&payload);
119  out.push_back(payload);
120  out.push_back(inContent[1]);
121  mWorkerTasks[pInMsg->uuid()].push_back(pTask);
122  SPD_TRACE("mWorkerTasks[{}] vector size [{}]", pInMsg->uuid(), mWorkerTasks[pInMsg->uuid()].size());
123  }
124  }
125  else if (inContent[0] == "TASK_IS_RUNNING") {
126  SPD_TRACE("TASK_IS_RUNNING");
127 
128  std::string payload = inContent[1];
129  TaskInfo * pTask = new TaskInfo();
130  {
131  if (!pTask->ParseFromString(payload)) {
132  SPD_ERROR("Message does not contain ProtoBuf message!");
133  for (auto s : inContent) {
134  SPD_ERROR("::onWhisper inMSG [{}]", s);
135  }
136  return;
137  }
138  }
139 
140  Job * job = mpNodeManager->job(pTask->jobid());
141  if (job) {
142  if (job->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
143  job->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::running);
144  }
145 
146  if (!job->consumer().empty()) {
147  std::shared_ptr<Consumer> pConsumer = mpNodeManager->consumer(job->consumer());
148  std::vector<std::string> output;
149  output.push_back(inContent[0]);
150  output.push_back(inContent[1]);
151  mpNodeManager->sendWhisper(pConsumer->pipe().get(), job->feeder(), output);
152  }
153  else {
154  delete pTask;
155  }
156  }
157  mpNodeManager->print();
158  }
159  else if (inContent[0] == "TASK_RESULT") {
160  std::string payload = inContent[1];
161  TaskInfo * pTask = new TaskInfo();
162  {
163  if (!pTask->ParseFromString(payload)) {
164  SPD_ERROR("Message does not contain ProtoBuf message!");
165  for (auto s : inContent) {
166  SPD_ERROR("::onWhisper inMSG [{}]", s);
167  }
168  return;
169  }
170  }
171 
172  removeWorkerTask(pTask, pInMsg->uuid());
173 
174  // task is deleted in next line
175  mpNodeManager->resultTask(pTask);
176  // TODO possible memleak? @mvala
177  }
178  else if (inContent[0] == "NODEINFO") {
179  std::string payload = inContent[1];
180  Salsa::NodeInfo * pNI = mpNodeInfo->add_hosts();
181  if (!pNI->ParseFromString(payload)) {
182  SPD_ERROR("[NodeInfo] Message does not contain ProtoBuf message!");
183  }
184  uint32_t slots = 0;
185 
186  for (auto nodeInfo : mpNodeInfo->hosts()) {
187  slots += nodeInfo.slots();
188  }
189  mpNodeInfo->set_slots(slots);
190  SPD_INFO("Workers [{}] slots [{}]", mpNodeInfo->hosts_size(), mpNodeInfo->slots());
191  }
192 }
193 
194 void Feeder::subscribe(std::string uuid)
195 {
199 
200  SPD_INFO("Client [{}] started", uuid);
201  SPD_TRACE("Feeders -> [{}]", mClients.size());
202  for (auto client : mClients) {
203  std::vector<std::string> out;
204  out.push_back("SUB");
205  mpNodeManager->sendWhisper(pipe().get(), client.first, out);
206  }
207 }
208 
209 void Feeder::removeWorkerTask(TaskInfo * pTaskInfo)
210 {
214 
215  for (auto wkTask : mWorkerTasks) {
216  removeWorkerTask(pTaskInfo, wkTask.first);
217  }
218 }
219 
220 void Feeder::removeWorkerTask(TaskInfo * pTaskInfo, std::string uuid)
221 {
225  SPD_TRACE("mWorkerTasks[{}].size() [{}]", uuid, mWorkerTasks[uuid].size());
226  if (mWorkerTasks[uuid].size() == 0) return;
227 
228  int iPos = 0;
229  for (auto pTask : mWorkerTasks[uuid]) {
230  if (pTask->taskid() == pTaskInfo->taskid()) {
231  mWorkerTasks[uuid].erase(mWorkerTasks[uuid].begin() + iPos);
232  return;
233  }
234  iPos++;
235  }
236 }
237 
238 void Feeder::terminateJob(std::string uuid)
239 {
243 
244  auto job = mpNodeManager->job(uuid);
245 
246  std::vector<Salsa::TaskInfo *> tasks;
247  job->tasks(tasks, Job::pending, false);
248  job->tasks(tasks, Job::assigned, false);
249  job->tasks(tasks, Job::running, false);
250  for (auto pTask : tasks) {
251  SPD_TRACE("removeWorkerTask [{}]", pTask->taskid());
252  removeWorkerTask(pTask);
253  }
254 
255  for (auto client : mClients) {
256  std::vector<std::string> out;
257  out.push_back("TERMINATEJOB");
258  out.push_back(uuid);
259  mpNodeManager->sendWhisper(pipe().get(), client.first, out);
260  }
261  SPD_INFO("JOB [{}] has finished", uuid);
262 }
263 
264 } // namespace Salsa
std::string mUUID
Self UUID.
Definition: Distributor.hh:48
std::map< std::string, std::vector< TaskInfo * > > mWorkerTasks
Worker tasks.
Definition: Feeder.hh:33
Base Message class.
Definition: Message.hh:15
Base Distributor class.
Definition: Distributor.hh:17
void consumer(std::string uuid)
Definition: Job.cc:189
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
Definition: Feeder.cc:34
void terminateJob(std::string uuid)
Definition: Feeder.cc:238
NodeManager * mpNodeManager
Node Manager.
Definition: Distributor.hh:52
NodeManager class.
Definition: NodeManager.hh:20
Job class.
Definition: Job.hh:16
std::shared_ptr< Socket > pipe() const
TODO Returns distributor&#39;s pipe?
Definition: Distributor.cc:77
NodeInfo * nodeInfo() const
Definition: Distributor.cc:92
void resultTask(TaskInfo *task)
Definition: NodeManager.cc:278
void feeder(std::string uuid)
Definition: Job.cc:205
virtual std::string uuid() const =0
Returns node uuid.
virtual ~Feeder()
Definition: Feeder.cc:16
Job * job(std::string uuid)
Definition: NodeManager.cc:418
std::string uuid() const
Returns distributor&#39;s UUID.
Definition: Distributor.cc:84
void tasks(std::vector< TaskInfo * > &v, QueueType type, bool clear=true)
Definition: Job.cc:105
void subscribe(std::string uuid)
Definition: Feeder.cc:194
std::map< std::string, std::string > mClients
List of clients.
Definition: Distributor.hh:50
TaskInfo * getNextTask()
Definition: NodeManager.cc:252
bool isTaskInQueue(uint32_t id, QueueType type) const
Check task presence in certain queue.
Definition: Job.cc:119
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
Definition: Feeder.cc:23
NodeInfo * mpNodeInfo
Node Info.
Definition: Distributor.hh:53
std::shared_ptr< Consumer > consumer(std::string uuid) const
Definition: NodeManager.cc:393
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Definition: NodeManager.cc:211
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
Definition: Feeder.cc:98
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
void print(std::string opt="") const
Definition: NodeManager.cc:30
bool moveTask(uint32_t id, QueueType from, QueueType to)
Definition: Job.cc:45
bool hasJobs() const
Definition: NodeManager.cc:457
void removeWorkerTask(TaskInfo *pTI)
Definition: Feeder.cc:209