salsa  0.4.0
NodeManager.cc
1 #include "NodeManager.hh"
2 
3 #include <json/json.h>
4 namespace Salsa {
5 NodeManager::NodeManager() : Object(), mFinishedJobTimeout(24 * 3600)
6 {
10  srand(static_cast<uint32_t>(time(nullptr))); // seed with time since epoch
11 }
12 
14 {
18 
19  for (auto job : mJobs) {
20  if (mpTaskPool) {
21  mpTaskPool->terminateJob(job.second);
22  }
23  delete job.second;
24  }
25  // terminateJobAll();
26  mJobs.clear();
27  delete mpTaskPool;
28  delete mpPublisher;
29 }
30 
31 void NodeManager::print(std::string /*opt*/) const
32 {
36 
37  SPD_TRACE("mFeeders [{}] mConsumers [{}] mWorkers [{}] mJobs [{}] ", mFeeders.size(), mConsumers.size(),
38  mWorkers.size(), mJobs.size());
39 
40  if (mJobs.size() > 0) {
41  SPD_DEBUG("= JOBS =======================");
42  for (auto j : mJobs) {
43  j.second->print();
44  }
45  SPD_DEBUG("==============================");
46  }
47  else {
48  SPD_DEBUG("= NO JOBS ====================");
49  }
50 
51  if (mpTaskPool) {
52  mpTaskPool->print();
53  }
54 }
55 
56 void NodeManager::addConsumer(std::string uuid, std::shared_ptr<Socket> pSocket)
57 {
61 
62  mConsumers.emplace(uuid, std::make_shared<Consumer>(uuid, pSocket, this));
63 }
64 
65 void NodeManager::addFeeder(std::string uuid, std::shared_ptr<Socket> pSocket)
66 {
70 
71  mFeeders.emplace(uuid, std::make_shared<Feeder>(uuid, pSocket, this));
72 }
73 
74 void NodeManager::addWorker(std::string uuid, std::shared_ptr<Socket> pSocket)
75 {
79 
80  mWorkers.emplace(uuid, std::make_shared<Worker>(uuid, pSocket, this));
81 }
82 
83 Socket * NodeManager::onEnter(std::string self, std::string fromType, Message * msg, std::vector<std::string> & out)
84 {
88 
89  SPD_TRACE("NodeManager::onEnter self [{}] from [{}] type [{}] msg [{}]", self, msg->uuid(), fromType,
90  static_cast<void *>(msg));
91 
92  // TODO Implement map<std::string /* self */, Node::ENodeType?>
93  auto pFeeder = feeder(self);
94  if (pFeeder) {
95  SPD_DEBUG("::onEnter FEEDER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
96  SPD_INFO("FEEDER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
97  if (fromType == "CONSUMER") {
98  pFeeder->addClient(msg->uuid(), fromType);
99  pFeeder->onEnter(msg, out, fromType);
100  }
101  else if (fromType == "WORKER") {
102  pFeeder->addClient(msg->uuid(), fromType);
103  pFeeder->onEnter(msg, out, fromType);
104  }
105  else if (fromType == "DISCOVERY") {
106  // We fully ignoring it
107  SPD_DEBUG("DISCOVERY is here");
108  }
109  else {
110  pFeeder->addOther(msg->uuid(), fromType);
111  }
112 
113  return pFeeder->pipe().get();
114  }
115 
116  auto pConsumer = consumer(self);
117  if (pConsumer) {
118  SPD_DEBUG("::onEnter CONSUMER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
119  SPD_INFO("CONSUMER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
120  if (fromType == "FEEDER") {
121  pConsumer->addClient(msg->uuid(), fromType);
122  pConsumer->onEnter(msg, out, fromType);
123  }
124  else {
125  pConsumer->addOther(msg->uuid(), fromType);
126  }
127 
128  return pConsumer->pipe().get();
129  }
130 
131  auto pWorker = worker(self);
132  if (pWorker) {
133  SPD_DEBUG("::onEnter WORKER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
134  SPD_INFO("WORKER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
135  if (fromType == "FEEDER") {
136  pWorker->addClient(msg->uuid(), fromType);
137  pWorker->onEnter(msg, out, fromType);
138  }
139  else {
140  pWorker->addOther(msg->uuid(), fromType);
141  }
142 
143  return pWorker->pipe().get();
144  }
145 
146  return nullptr;
147 }
148 Socket * NodeManager::onExit(std::string self, Message * msg, std::vector<std::string> & out)
149 {
153 
154  SPD_TRACE("NodeManager::onExit self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
155 
156  auto pWorker = worker(self);
157  if (pWorker) {
158  SPD_DEBUG("::onExit WORKER [{}] client on network [{}] has left", self, msg->uuid());
159  SPD_INFO("WORKER [{}] => [{}]", self, msg->uuid());
160  pWorker->onExit(msg, out);
161  pWorker->removeClient(msg->uuid());
162  return pWorker->pipe().get();
163  }
164 
165  auto pFeeder = feeder(self);
166  if (pFeeder) {
167  SPD_DEBUG("::onExit FEEDER [{}] client on network [{}] has left", self, msg->uuid());
168  SPD_INFO("FEEDER [{}] => [{}]", self, msg->uuid());
169  pFeeder->onExit(msg, out);
170  pFeeder->removeClient(msg->uuid());
171  return pFeeder->pipe().get();
172  }
173 
174  auto pConsumer = consumer(self);
175  if (pConsumer) {
176  SPD_DEBUG("::onExit CONSUMER [{}] client on network [{}] has left", self, msg->uuid());
177  SPD_INFO("CONSUMER [{}] => [{}]", self, msg->uuid());
178  pConsumer->onExit(msg, out);
179  pConsumer->removeClient(msg->uuid());
180  return pConsumer->pipe().get();
181  }
182 
183  return nullptr;
184 }
185 
186 Socket * NodeManager::onWhisper(std::string self, Message * msg, std::vector<std::string> & out)
187 {
191 
192  SPD_TRACE("NodeManager::onWhisper self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
193 
194  auto pFeeder = feeder(self);
195  if (pFeeder) {
196  SPD_TRACE("::onWhisper() FEEDER [{}] from [{}] has msg", self, msg->uuid());
197  pFeeder->onWhisper(msg, out);
198  return pFeeder->pipe().get();
199  }
200 
201  auto pConsumer = consumer(self);
202  if (pConsumer) {
203  SPD_TRACE("::onWhisper() CONSUMER [{}] from [{}] has msg", self, msg->uuid());
204  pConsumer->onWhisper(msg, out);
205  return pConsumer->pipe().get();
206  }
207 
208  auto pWorker = worker(self);
209  if (pWorker) {
210  SPD_TRACE("::onWhisper() WORKER [{}] from [{}] has msg", self, msg->uuid());
211  pWorker->onWhisper(msg, out);
212  return pWorker->pipe().get();
213  }
214 
215  return nullptr;
216 }
217 
218 bool NodeManager::sendWhisper(Socket * /*s*/, std::string /*to*/, std::vector<std::string> & /*v*/)
219 {
223 
224  return true;
225 }
226 
227 void NodeManager::addTask(TaskInfo * pTaskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType type)
228 {
232 
233  Salsa::Job * pJob = nullptr;
234  auto search = mJobs.find(pTaskInfo->jobid());
235  if (search != mJobs.end()) {
236  // if pJob was found
237  pJob = search->second;
238  }
239  else {
240  // if pJob was not found
241  pJob = new Salsa::Job(pTaskInfo->jobid());
242  pJob->consumer(cuuid);
243  pJob->feeder(fuuid);
244  mJobs.insert(std::make_pair(pTaskInfo->jobid(), pJob));
245  mActiveJobs.push_back(pTaskInfo->jobid());
246  // TODO : now we need to tell all feeders that theat they should subscribe to workers
247  SPD_TRACE("Looping feeders");
248  for (auto feeder : mFeeders) {
250  SPD_TRACE("Subscribe to feeder [{}]", feeder.first);
251  feeder.second->subscribe(pTaskInfo->jobid());
252  }
253  }
254 
255  SPD_TRACE("::addTask from [{}] with task id [{}]", pTaskInfo->jobid(), pTaskInfo->taskid());
256  pJob->addTask(pTaskInfo->taskid(), pTaskInfo, type);
257 }
258 
260 {
264  TaskInfo * pTaskInfo = nullptr;
265 
266  SPD_TRACE("mActiveJobs.size() [{}]", mActiveJobs.size());
267  while (mActiveJobs.size() > 0 && pTaskInfo == nullptr) {
268  size_t index = static_cast<size_t>(rand()) % mActiveJobs.size();
269  std::string jobstr = mActiveJobs[index];
270  auto iJob = mJobs.find(jobstr);
271  if (iJob != mJobs.end()) {
272  pTaskInfo = iJob->second->nextTask();
273  if (pTaskInfo) {
274  SPD_TRACE("getNextTask FEEDER [{}] JOB [{}:{}]", iJob->first, pTaskInfo->jobid(), pTaskInfo->taskid());
275  return pTaskInfo;
276  }
277  }
278 
279  // removing jobstring from mActiveJobs
280  mActiveJobs.erase(std::remove(begin(mActiveJobs), end(mActiveJobs), jobstr), end(mActiveJobs));
281  }
282 
283  SPD_TRACE("::getNextTask No pTaskInfo found");
284  return nullptr;
285 }
286 
287 void NodeManager::resultTask(TaskInfo * pTask)
288 {
292 
293  Job * pJob = job(pTask->jobid());
294  if (pJob == nullptr) {
295  delete pTask;
296  return;
297  }
298 
299  SPD_TRACE("TASK ENDED JOB [{}:{}]", pTask->jobid(), pTask->taskid());
300  // search->second->moveTask(pTask->taskid(), pTask, Salsa::Job::running, Salsa::Job::done);
301 
302  // If job has no consumer we end (assuming that it is SUBMITTER)
303  if (pJob->consumer().empty()) {
304  // TODO : Fix done and failed
305  auto sourceQueue = (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) ? Job::assigned : Job::running;
306  if (pTask->returncode() == 0) {
307  pJob->moveTask(pTask->taskid(), sourceQueue, Salsa::Job::done);
308  }
309  else {
310  pJob->moveTask(pTask->taskid(), sourceQueue, Salsa::Job::failed);
311  }
312 
313  if (pJob->isFinished()) {
315  mFinishedJobs.push_back(pJob->uuid());
316  }
317 
318  resultTaskToExternal(pJob, pTask);
319 
320  print();
321  // TODO we need to think what to do with TaskInfo object in highest level
322  delete pTask;
323  return;
324  }
325 
326  if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
327  SPD_WARN("Task [{}] duplicate found in [assigned] queue!", pTask->taskid());
328  pJob->removeTask(pTask->taskid(), Salsa::Job::assigned);
329  }
330  else {
331  SPD_WARN("Task [{}] duplicate found in [running] queue!", pTask->taskid());
332  pJob->removeTask(pTask->taskid(), Salsa::Job::running);
333  }
334 
335  std::shared_ptr<Consumer> pConsumer = consumer(pJob->consumer());
336  std::vector<std::string> out = {"TASK_RESULT"};
337  // out.push_back("TASK_RESULT");
338  std::string payload;
339  pTask->SerializeToString(&payload);
340  out.push_back(payload);
341  uint32_t slots = nSlots();
342 
343  // delete pTask;
344 
345  // TODO only for testing, REMOVE IT later
346  // - Well, what about #ifdef DEBUG ?
347  if (getenv("SALSA_FAKE")) slots *= 10;
348 
349  if (pJob->size(Job::pending) < slots) {
350  if (pJob->haveMoreTasks()) {
351  SPD_TRACE("We are requesting new tasks [{}] haveMoreTasks [{}]", slots, pJob->haveMoreTasks());
352  out.push_back("&");
353  out.push_back("SENDTASKS");
354  out.push_back(fmt::format("{}", slots));
355  }
356  }
357 
358  sendWhisper(pConsumer->pipe().get(), pJob->feeder(), out);
359 }
360 
361 void NodeManager::terminateJob(std::string uuid)
362 {
366 
367  SPD_TRACE("Terminating job from client [{}]", uuid);
368 
369  auto iJob = mJobs.find(uuid);
370  if (iJob != mJobs.end()) {
371  if (mpTaskPool) {
372  mpTaskPool->terminateJob(iJob->second);
373  }
374 
375  for (auto f : mFeeders) {
376  f.second->terminateJob(uuid);
377  }
378 
379  mFinishedJobs.erase(std::remove(begin(mFinishedJobs), end(mFinishedJobs), uuid), end(mFinishedJobs));
380 
381  SPD_TRACE("Removing job [{}]", uuid);
382  delete iJob->second;
383  iJob->second = nullptr;
384  mJobs.erase(iJob);
385  }
386  SPD_TRACE("NodeManager::terminateJob print()");
387  print();
388 }
389 
391 {
395 
396  if (mFinishedJobs.size() == 0) return false;
397 
398  SPD_DEBUG("Checking finished jobs [{}] to be removed ...", mFinishedJobs.size());
399 
400  std::chrono::time_point<std::chrono::system_clock> curTime = std::chrono::system_clock::now();
401  uint64_t curTimeEpoch = std::chrono::duration_cast<std::chrono::seconds>(curTime.time_since_epoch()).count();
402  std::vector<std::string> cleanUUID;
403  for (auto js : mFinishedJobs) {
404  auto j = job(js);
405  if (j == nullptr) continue;
406  if (curTimeEpoch - j->timeFinished() > mFinishedJobTimeout) {
407  SPD_DEBUG("Terminating finished job. Time : diff[{}] timeout[{}]", curTimeEpoch - j->timeFinished(),
409  cleanUUID.push_back(js);
410  }
411  }
412  if (!cleanUUID.size()) return false;
413 
414  for (auto u : cleanUUID) {
415  terminateJob(u);
416  }
417 
418  return true;
419 }
420 
421 void NodeManager::terminateAllJobs(bool finishedonly)
422 {
426  if (mJobs.size() == 0) return;
427 
428  std::vector<std::string> cleanUUID;
429  if (finishedonly)
430  for (auto job : mFinishedJobs) cleanUUID.push_back(job);
431 
432  else {
433  for (auto job : mJobs) cleanUUID.push_back(job.first);
434  }
435 
436  for (auto u : cleanUUID) {
437  SPD_DEBUG("Terminating [{}]", u);
438  terminateJob(u);
439  }
440 }
441 
442 std::shared_ptr<Feeder> NodeManager::feeder(std::string uuid) const
443 {
448  auto search = mFeeders.find(uuid);
449  if (search != mFeeders.end()) {
450  return search->second;
451  }
452  else {
453  return nullptr;
454  }
455 }
456 std::shared_ptr<Consumer> NodeManager::consumer(std::string uuid) const
457 {
462  auto search = mConsumers.find(uuid);
463  if (search != mConsumers.end()) {
464  return search->second;
465  }
466  return nullptr;
467 }
468 std::shared_ptr<Worker> NodeManager::worker(std::string uuid) const
469 {
474  auto search = mWorkers.find(uuid);
475  if (search != mWorkers.end()) {
476  return search->second;
477  }
478  return nullptr;
479 }
480 
481 Job * NodeManager::job(std::string uuid)
482 {
487  auto search = mJobs.find(uuid);
488  if (search != mJobs.end()) {
489  return search->second;
490  }
491  return nullptr;
492 }
493 
495 {
499  if (mpTaskPool == nullptr) {
500  mpTaskPool = new TaskPool(this);
501  }
502 }
503 
504 bool NodeManager::handleTaskPool(void * /*p*/)
505 {
509  return false;
510 }
511 
513 {
517  return mpTaskPool;
518 }
519 
521 {
525  return mActiveJobs.size() > 0;
526 }
527 
528 void NodeManager::jobs(std::string client_uuid, std::vector<std::string> & jobs) const
529 {
533 
534  for (auto search : mJobs) {
535  if (search.second != nullptr && search.second->feeder() == client_uuid) {
536  jobs.push_back(search.first);
537  }
538  }
539 }
540 int32_t NodeManager::nSlots(double mult) const
541 {
545 
546  int32_t num = 0;
547  for (auto feeder : mFeeders) {
548  num += feeder.second->nodeInfo()->slots();
549  }
550  return num * mult;
551 }
552 
553 void NodeManager::noMoreTasks(std::string /*jobUUID*/)
554 {
558 
559  (void)(0);
560  // auto search = mJobs.find(jobUUID);
561  // if (search != mJobs.end()) {
562  // return search->second->haveMoreTasks();
563  //}
564 }
565 
567 {
571 
572  bool rc = false;
573  for (auto job : mJobs) {
574  if (job.second->Job::size(Job::pending)) {
575  // job.second->haveMoreTasks(true);
576  bool isActiveJob = false;
577  for (auto pActiveJobUUID : mActiveJobs) {
578  if (pActiveJobUUID == job.first) {
579  isActiveJob = true;
580  break;
581  }
582  }
583 
584  if (!isActiveJob) {
585  mActiveJobs.push_back(job.first);
586  }
587 
588  rc = true;
589  }
590  }
591  return rc;
592 }
593 
594 bool NodeManager::haveMoreTasks(std::string jobUUID)
595 {
599  auto found = mJobs.find(jobUUID);
600  if (found != mJobs.end()) {
601  return found->second->haveMoreTasks();
602  }
603  else {
604  return false;
605  }
606 }
607 
609 {
613  mpPublisher = pPublisher;
614 }
615 
617 {
621  return mpPublisher;
622 }
623 
624 void NodeManager::publish(std::string id) const
625 {
629  if (!mpPublisher) return;
630 
631  if (mJobs.size() == 0) {
632  SPD_TRACE("Publish id [{}] with zero tasks ", id);
633  mpPublisher->publish(id, "{\"tasks\":[]}");
634  return;
635  }
636 
637  bool changed = false;
638  for (auto job : mJobs) {
639  if (job.second->changed()) changed = true;
640  }
641 
642  if (!changed) return;
643 
644  Json::Value json;
645  for (auto job : mJobs) {
646  job.second->json(json["tasks"]);
647  }
648 
649  Json::StreamWriterBuilder wBuilder;
650  wBuilder["indentation"] = "";
651  std::string data = Json::writeString(wBuilder, json);
652 
653  // print();
654  SPD_TRACE("Publish id [{}] data [{}] ", id, data);
655  mpPublisher->publish(id, data);
656 
657  for (auto job : mJobs) {
658  job.second->changed(false);
659  }
660 }
661 
662 } // namespace Salsa
std::vector< std::string > mActiveJobs
List of active jobs.
Definition: NodeManager.hh:88
bool removeTask(uint32_t id, EQueueType from)
Definition: Job.cc:84
Base Message class.
Definition: Message.hh:15
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition: Job.cc:47
void consumer(std::string uuid)
Definition: Job.cc:210
uint64_t mFinishedJobTimeout
Finished job timeout in seconds.
Definition: NodeManager.hh:90
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
Definition: NodeManager.cc:528
std::shared_ptr< Worker > worker(std::string uuid) const
Definition: NodeManager.cc:468
bool terminateJob(Job *pJob)
Definition: TaskPool.cc:99
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:148
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:361
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
Definition: NodeManager.hh:92
virtual void addTaskSlot()
Definition: NodeManager.cc:494
virtual bool haveMoreTasks()
Definition: NodeManager.cc:566
virtual void noMoreTasks(std::string jobUUID)
Definition: NodeManager.cc:553
Job class.
Definition: Job.hh:17
bool isFinished()
Returns if jobs is finished.
Definition: Job.cc:252
bool changed() const
Returns if job info was changed.
Definition: Job.hh:69
int32_t nSlots(double mult=1.0) const
Definition: NodeManager.cc:540
EQueueType
Queue types.
Definition: Job.hh:20
std::string uuid() const
returns UUID
Definition: Job.hh:29
virtual void resultTask(TaskInfo *task)
Definition: NodeManager.cc:287
void feeder(std::string uuid)
Definition: Job.cc:227
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:65
virtual std::string uuid() const =0
Returns node uuid.
Base Publisher class.
Definition: Publisher.hh:14
Job * job(std::string uuid)
Definition: NodeManager.cc:481
bool hasJobs() const
Definition: NodeManager.cc:520
void print(std::string opt="") const
Definition: NodeManager.cc:31
std::map< std::string, Job * > mJobs
List of jobs.
Definition: NodeManager.hh:87
virtual Publisher * publisher() const
Definition: NodeManager.cc:616
std::vector< std::string > mFinishedJobs
List of finished jobs.
Definition: NodeManager.hh:89
TaskInfo * getNextTask()
Definition: NodeManager.cc:259
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:186
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
Definition: NodeManager.hh:91
virtual ~NodeManager()
Definition: NodeManager.cc:13
void print(bool verbose=false) const
Definition: TaskPool.cc:128
virtual void publish(std::string id) const
Definition: NodeManager.cc:624
Base Socket class.
Definition: Socket.hh:15
bool haveMoreTasks() const
Task statuses.
Definition: Job.cc:244
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
Definition: Job.cc:29
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:56
TaskPool * mpTaskPool
Task pool.
Definition: NodeManager.hh:94
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Definition: NodeManager.cc:218
void json(Json::Value &json)
Definition: Job.cc:159
Publisher * mpPublisher
Publisher.
Definition: NodeManager.hh:95
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:74
virtual bool handleTaskPool(void *p)
Definition: NodeManager.cc:504
TaskPool * taskPool()
Get NM's task pool.
Definition: NodeManager.cc:512
Base Salsa Object class.
Definition: Object.hh:15
virtual bool terminateFinishedJobs()
Definition: NodeManager.cc:390
virtual void publish(std::string id, std::string data)=0
Publish TODO publish what?
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
Definition: NodeManager.hh:93
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:83
Base salsa TaskPool class.
Definition: TaskPool.hh:18
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:227
std::shared_ptr< Feeder > feeder(std::string uuid) const
Definition: NodeManager.cc:442
size_t size(EQueueType t=all) const
Definition: Job.cc:179
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition: Job.cc:131
virtual void terminateAllJobs(bool finishedonly=false)
Definition: NodeManager.cc:421
std::shared_ptr< Consumer > consumer(std::string uuid) const
Definition: NodeManager.cc:456
virtual void resultTaskToExternal(Job *, TaskInfo *)
Handle return of task and send it to external client.
Definition: NodeManager.hh:66