salsa  0.4.0
Feeder.cc
1 #include "Feeder.hh"
2 #include <Job.hh>
3 #include <NodeManager.hh>
4 
5 using namespace fmt::literals;
6 
7 namespace Salsa {
8 Feeder::Feeder(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * pNM) : Distributor(uuid, pipe, pNM)
9 {
13  mpNodeInfo->set_uuid(mUUID);
14 }
15 
17 {
21 }
22 
23 void Feeder::onEnter(Message * /*pInMsg*/, std::vector<std::string> & out, std::string type)
24 {
28 
29  if (type == "WORKER" && mpNodeManager->hasJobs()) {
30  out.push_back("SUB");
31  }
32 }
33 
34 void Feeder::onExit(Message * pInMsg, std::vector<std::string> & /*out*/)
35 {
39 
40  SPD_TRACE("::onExit inMSG [{}]", pInMsg->uuid());
41 
42  uint32_t slots = 0;
43  uint32_t iPos = 0;
44  bool found = false;
45  for (auto & node : mpNodeInfo->hosts()) {
46  if (pInMsg->uuid() == node.uuid()) {
47  found = true;
48  }
49  else {
50  slots += node.slots();
51  if (!found) {
52  iPos++;
53  }
54  }
55  }
56 
57  // TODO: Continue if worker (Better check is needed)
58  if (found) { // if node is found in local reg
59  mpNodeInfo->mutable_hosts()->DeleteSubrange(iPos, 1);
60  mpNodeInfo->set_slots(slots);
61 
62  for (auto pTask : mWorkerTasks[pInMsg->uuid()]) {
63  SPD_WARN("WORKER [{}] exit. Moving task [{}:{}] to Job::pending", pInMsg->uuid(), pTask->jobid(),
64  pTask->taskid());
65  Job * pJob = mpNodeManager->job(pTask->jobid());
66  if (pJob) {
67  if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::running)) {
68  pJob->moveTask(pTask->taskid(), Salsa::Job::running, Salsa::Job::pending);
69  }
70  else if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
71  pJob->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
72  }
73 
74  if (!getenv("SALSA_FAST")) {
75  if (!pJob->consumer().empty()) {
76  std::shared_ptr<Consumer> pConsumer = mpNodeManager->consumer(pJob->consumer());
77  std::vector<std::string> outData;
78  outData.push_back("JOBRESUBMITED");
79  std::string payload;
80  pTask->SerializeToString(&payload);
81  outData.push_back(payload);
82  mpNodeManager->sendWhisper(pConsumer->pipe().get(), pJob->feeder(), outData);
83  }
84  }
85  }
86  }
87 
88  mWorkerTasks[pInMsg->uuid()].clear();
89  mWorkerTasks.erase(pInMsg->uuid());
90  mClients.erase(pInMsg->uuid());
91 
93  subscribe(pInMsg->uuid());
94  }
96  SPD_INFO("Workers [{}] slots [{}]", mpNodeInfo->hosts_size(), mpNodeInfo->slots());
97  }
98 }
99 void Feeder::onWhisper(Message * pInMsg, std::vector<std::string> & out)
100 {
104 
105  std::vector<std::string> inContent = pInMsg->content();
106 
107  SPD_TRACE("::onWhisper inMSG [{}]", inContent[0]);
108  if (inContent[0] == "FREESLOT") { // from consumer
109  // Signal arrives from consumer. Indicates that consumer has free exec slot.
110  SPD_TRACE("Searching for task in one of jobs");
111  TaskInfo * pTask = mpNodeManager->getNextTask();
112  // If there's task available, send it out
113  if (pTask == nullptr) {
114  SPD_TRACE("Sending back NOMORETASKS");
115  out.push_back("NOMORETASKS");
116  out.push_back(inContent[1]);
117  }
118  else {
119  out.push_back("TASK");
120  std::string payload;
121  pTask->SerializeToString(&payload);
122  out.push_back(payload);
123  out.push_back(inContent[1]);
124  mWorkerTasks[pInMsg->uuid()].push_back(pTask);
125  SPD_TRACE("mWorkerTasks[{}] vector size [{}]", pInMsg->uuid(), mWorkerTasks[pInMsg->uuid()].size());
126  }
127  }
128  else if (inContent[0] == "TASK_IS_RUNNING") { // from consumer
129  SPD_TRACE("TASK_IS_RUNNING");
130 
131  std::string payload = inContent[1];
132  TaskInfo * pTask = new TaskInfo();
133  {
134  if (!pTask->ParseFromString(payload)) {
135  SPD_ERROR("Message does not contain ProtoBuf message!");
136  for (auto s : inContent) {
137  SPD_ERROR("::onWhisper inMSG [{}]", s);
138  }
139  return;
140  }
141  }
142 
143  Job * job = mpNodeManager->job(pTask->jobid());
144  if (job) {
145  if (job->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
146  job->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::running);
147  }
148 
149  if (!job->consumer().empty()) {
150  std::shared_ptr<Consumer> pConsumer = mpNodeManager->consumer(job->consumer());
151  std::vector<std::string> output;
152  output.push_back(inContent[0]);
153  output.push_back(inContent[1]);
154  mpNodeManager->sendWhisper(pConsumer->pipe().get(), job->feeder(), output);
155  }
156  else {
157  delete pTask;
158  }
159  }
160  mpNodeManager->print();
161  }
162  else if (inContent[0] == "TASK_RESULT") { // from consumer
163  std::string payload = inContent[1];
164  TaskInfo * pTask = new TaskInfo();
165  {
166  if (!pTask->ParseFromString(payload)) {
167  SPD_ERROR("Message does not contain ProtoBuf message!");
168  for (auto line : inContent) {
169  SPD_ERROR("::onWhisper inMSG [{}]", line);
170  }
171  return;
172  }
173  }
174 
175  removeWorkerTask(pTask, pInMsg->uuid());
176 
177  // task is deleted in next line
178  mpNodeManager->resultTask(pTask);
179  // TODO possible memleak? @mvala
180  }
181  else if (inContent[0] == "NODEINFO") { // from consumer
182  std::string payload = inContent[1];
183  Salsa::NodeInfo * pNI = mpNodeInfo->add_hosts();
184  if (!pNI->ParseFromString(payload)) {
185  SPD_ERROR("[NodeInfo] Message does not contain ProtoBuf message!");
186  }
187  uint32_t slots = 0;
188 
189  for (auto nodeInfo : mpNodeInfo->hosts()) {
190  slots += nodeInfo.slots();
191  }
192  mpNodeInfo->set_slots(slots);
193  SPD_INFO("Workers [{}] slots [{}]", mpNodeInfo->hosts_size(), mpNodeInfo->slots());
194  }
195 }
196 
197 void Feeder::subscribe(std::string uuid)
198 {
202 
203  SPD_INFO("Client [{}] started", uuid);
204  SPD_TRACE("Feeders -> [{}]", mClients.size());
205  for (auto client : mClients) {
206  std::vector<std::string> out;
207  out.push_back("SUB");
208  mpNodeManager->sendWhisper(pipe().get(), client.first, out);
209  }
210 }
211 
212 void Feeder::removeWorkerTask(TaskInfo * pTaskInfo)
213 {
217 
218  for (auto wkTask : mWorkerTasks) {
219  removeWorkerTask(pTaskInfo, wkTask.first);
220  }
221 }
222 
223 void Feeder::removeWorkerTask(TaskInfo * pTaskInfo, std::string uuid)
224 {
228  SPD_TRACE("mWorkerTasks[{}].size() [{}]", uuid, mWorkerTasks[uuid].size());
229  if (mWorkerTasks[uuid].size() == 0) return;
230 
231  int iPos = 0;
232  for (auto pTask : mWorkerTasks[uuid]) {
233  if (pTask->taskid() == pTaskInfo->taskid()) {
234  mWorkerTasks[uuid].erase(mWorkerTasks[uuid].begin() + iPos);
235  return;
236  }
237  iPos++;
238  }
239 }
240 
241 void Feeder::terminateJob(std::string uuid)
242 {
246 
247  auto job = mpNodeManager->job(uuid);
248 
249  std::vector<Salsa::TaskInfo *> tasks;
250  job->tasks(tasks, Job::pending, false);
251  job->tasks(tasks, Job::assigned, false);
252  job->tasks(tasks, Job::running, false);
253  for (auto pTask : tasks) {
254  SPD_TRACE("removeWorkerTask [{}]", pTask->taskid());
255  removeWorkerTask(pTask);
256  }
257 
258  for (auto client : mClients) {
259  std::vector<std::string> out;
260  out.push_back("TERMINATEJOB");
261  out.push_back(uuid);
262  mpNodeManager->sendWhisper(pipe().get(), client.first, out);
263  }
264  SPD_INFO("JOB [{}] has finished", uuid);
265 }
266 
267 } // namespace Salsa
std::string mUUID
Self UUID.
Definition: Distributor.hh:49
NodeInfo * nodeInfo() const
Definition: Distributor.cc:92
std::map< std::string, std::vector< TaskInfo * > > mWorkerTasks
Worker tasks.
Definition: Feeder.hh:33
Base Message class.
Definition: Message.hh:15
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition: Job.cc:47
Base Distributor class.
Definition: Distributor.hh:18
void consumer(std::string uuid)
Definition: Job.cc:210
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
Definition: Feeder.cc:34
void terminateJob(std::string uuid)
Definition: Feeder.cc:241
NodeManager * mpNodeManager
Node Manager.
Definition: Distributor.hh:53
virtual bool haveMoreTasks()
Definition: NodeManager.cc:566
NodeManager class.
Definition: NodeManager.hh:22
Job class.
Definition: Job.hh:17
virtual void resultTask(TaskInfo *task)
Definition: NodeManager.cc:287
void feeder(std::string uuid)
Definition: Job.cc:227
virtual std::string uuid() const =0
Returns node uuid.
virtual ~Feeder()
Definition: Feeder.cc:16
Job * job(std::string uuid)
Definition: NodeManager.cc:481
void subscribe(std::string uuid)
Definition: Feeder.cc:197
bool hasJobs() const
Definition: NodeManager.cc:520
void print(std::string opt="") const
Definition: NodeManager.cc:31
std::map< std::string, std::string > mClients
List of clients.
Definition: Distributor.hh:51
TaskInfo * getNextTask()
Definition: NodeManager.cc:259
std::shared_ptr< Socket > pipe() const
TODO Returns distributor's pipe?
Definition: Distributor.cc:77
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
Definition: Feeder.cc:23
NodeInfo * mpNodeInfo
Node Info.
Definition: Distributor.hh:54
std::string uuid() const
Returns distributor's UUID.
Definition: Distributor.cc:84
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Definition: NodeManager.cc:218
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
Definition: Feeder.cc:99
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition: Job.cc:116
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition: Job.cc:131
void removeWorkerTask(TaskInfo *pTI)
Definition: Feeder.cc:212
std::shared_ptr< Consumer > consumer(std::string uuid) const
Definition: NodeManager.cc:456