1 #include "NodeManager.hh" 10 srand(static_cast<uint32_t>(time(
nullptr)));
37 SPD_TRACE(
"mFeeders [{}] mConsumers [{}] mWorkers [{}] mJobs [{}] ",
mFeeders.size(),
mConsumers.size(),
40 if (
mJobs.size() > 0) {
41 SPD_DEBUG(
"= JOBS =======================");
42 for (
auto j :
mJobs) {
45 SPD_DEBUG(
"==============================");
48 SPD_DEBUG(
"= NO JOBS ====================");
62 mConsumers.emplace(uuid, std::make_shared<Consumer>(uuid, pSocket,
this));
71 mFeeders.emplace(uuid, std::make_shared<Feeder>(uuid, pSocket,
this));
80 mWorkers.emplace(uuid, std::make_shared<Worker>(uuid, pSocket,
this));
89 SPD_TRACE(
"NodeManager::onEnter self [{}] from [{}] type [{}] msg [{}]",
self, msg->
uuid(), fromType,
90 static_cast<void *>(msg));
93 auto pFeeder =
feeder(
self);
95 SPD_DEBUG(
"::onEnter FEEDER [{}] has client on network [{}] type [{}]",
self, msg->
uuid(), fromType);
96 SPD_INFO(
"FEEDER [{}] <= [{}] [{}]",
self, msg->
uuid(), fromType);
97 if (fromType ==
"CONSUMER") {
98 pFeeder->addClient(msg->
uuid(), fromType);
99 pFeeder->onEnter(msg, out, fromType);
101 else if (fromType ==
"WORKER") {
102 pFeeder->addClient(msg->
uuid(), fromType);
103 pFeeder->onEnter(msg, out, fromType);
105 else if (fromType ==
"DISCOVERY") {
107 SPD_DEBUG(
"DISCOVERY is here");
110 pFeeder->addOther(msg->
uuid(), fromType);
113 return pFeeder->pipe().get();
118 SPD_DEBUG(
"::onEnter CONSUMER [{}] has client on network [{}] type [{}]",
self, msg->
uuid(), fromType);
119 SPD_INFO(
"CONSUMER [{}] <= [{}] [{}]",
self, msg->
uuid(), fromType);
120 if (fromType ==
"FEEDER") {
121 pConsumer->addClient(msg->
uuid(), fromType);
122 pConsumer->onEnter(msg, out, fromType);
125 pConsumer->addOther(msg->
uuid(), fromType);
128 return pConsumer->pipe().get();
131 auto pWorker =
worker(
self);
133 SPD_DEBUG(
"::onEnter WORKER [{}] has client on network [{}] type [{}]",
self, msg->
uuid(), fromType);
134 SPD_INFO(
"WORKER [{}] <= [{}] [{}]",
self, msg->
uuid(), fromType);
135 if (fromType ==
"FEEDER") {
136 pWorker->addClient(msg->
uuid(), fromType);
137 pWorker->onEnter(msg, out, fromType);
140 pWorker->addOther(msg->
uuid(), fromType);
143 return pWorker->pipe().get();
154 SPD_TRACE(
"NodeManager::onExit self [{}] from [{}] msg [{}]",
self, msg->
uuid(), static_cast<void *>(msg));
156 auto pWorker =
worker(
self);
158 SPD_DEBUG(
"::onExit WORKER [{}] client on network [{}] has left",
self, msg->
uuid());
159 SPD_INFO(
"WORKER [{}] => [{}]",
self, msg->
uuid());
160 pWorker->onExit(msg, out);
161 pWorker->removeClient(msg->
uuid());
162 return pWorker->pipe().get();
165 auto pFeeder =
feeder(
self);
167 SPD_DEBUG(
"::onExit FEEDER [{}] client on network [{}] has left",
self, msg->
uuid());
168 SPD_INFO(
"FEEDER [{}] => [{}]",
self, msg->
uuid());
169 pFeeder->onExit(msg, out);
170 pFeeder->removeClient(msg->
uuid());
171 return pFeeder->pipe().get();
176 SPD_DEBUG(
"::onExit CONSUMER [{}] client on network [{}] has left",
self, msg->
uuid());
177 SPD_INFO(
"CONSUMER [{}] => [{}]",
self, msg->
uuid());
178 pConsumer->onExit(msg, out);
179 pConsumer->removeClient(msg->
uuid());
180 return pConsumer->pipe().get();
192 SPD_TRACE(
"NodeManager::onWhisper self [{}] from [{}] msg [{}]",
self, msg->
uuid(), static_cast<void *>(msg));
194 auto pFeeder =
feeder(
self);
196 SPD_TRACE(
"::onWhisper() FEEDER [{}] from [{}] has msg",
self, msg->
uuid());
197 pFeeder->onWhisper(msg, out);
198 return pFeeder->pipe().get();
203 SPD_TRACE(
"::onWhisper() CONSUMER [{}] from [{}] has msg",
self, msg->
uuid());
204 pConsumer->onWhisper(msg, out);
205 return pConsumer->pipe().get();
208 auto pWorker =
worker(
self);
210 SPD_TRACE(
"::onWhisper() WORKER [{}] from [{}] has msg",
self, msg->
uuid());
211 pWorker->onWhisper(msg, out);
212 return pWorker->pipe().get();
234 auto search =
mJobs.find(pTaskInfo->jobid());
235 if (search !=
mJobs.end()) {
237 pJob = search->second;
244 mJobs.insert(std::make_pair(pTaskInfo->jobid(), pJob));
247 SPD_TRACE(
"Looping feeders");
250 SPD_TRACE(
"Subscribe to feeder [{}]",
feeder.first);
251 feeder.second->subscribe(pTaskInfo->jobid());
255 SPD_TRACE(
"::addTask from [{}] with task id [{}]", pTaskInfo->jobid(), pTaskInfo->taskid());
256 pJob->
addTask(pTaskInfo->taskid(), pTaskInfo, type);
264 TaskInfo * pTaskInfo =
nullptr;
266 SPD_TRACE(
"mActiveJobs.size() [{}]",
mActiveJobs.size());
267 while (
mActiveJobs.size() > 0 && pTaskInfo ==
nullptr) {
268 size_t index = static_cast<size_t>(rand()) %
mActiveJobs.size();
270 auto iJob =
mJobs.find(jobstr);
271 if (iJob !=
mJobs.end()) {
272 pTaskInfo = iJob->second->nextTask();
274 SPD_TRACE(
"getNextTask FEEDER [{}] JOB [{}:{}]", iJob->first, pTaskInfo->jobid(), pTaskInfo->taskid());
283 SPD_TRACE(
"::getNextTask No pTaskInfo found");
293 Job * pJob =
job(pTask->jobid());
294 if (pJob ==
nullptr) {
299 SPD_TRACE(
"TASK ENDED JOB [{}:{}]", pTask->jobid(), pTask->taskid());
305 auto sourceQueue = (pJob->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) ? Job::assigned : Job::running;
306 if (pTask->returncode() == 0) {
307 pJob->
moveTask(pTask->taskid(), sourceQueue, Salsa::Job::done);
310 pJob->
moveTask(pTask->taskid(), sourceQueue, Salsa::Job::failed);
326 if (pJob->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
327 SPD_WARN(
"Task [{}] duplicate found in [assigned] queue!", pTask->taskid());
328 pJob->
removeTask(pTask->taskid(), Salsa::Job::assigned);
331 SPD_WARN(
"Task [{}] duplicate found in [running] queue!", pTask->taskid());
332 pJob->
removeTask(pTask->taskid(), Salsa::Job::running);
336 std::vector<std::string> out = {
"TASK_RESULT"};
339 pTask->SerializeToString(&payload);
340 out.push_back(payload);
341 uint32_t slots =
nSlots();
347 if (getenv(
"SALSA_FAKE")) slots *= 10;
349 if (pJob->
size(Job::pending) < slots) {
351 SPD_TRACE(
"We are requesting new tasks [{}] haveMoreTasks [{}]", slots, pJob->
haveMoreTasks());
353 out.push_back(
"SENDTASKS");
354 out.push_back(fmt::format(
"{}", slots));
367 SPD_TRACE(
"Terminating job from client [{}]", uuid);
369 auto iJob =
mJobs.find(uuid);
370 if (iJob !=
mJobs.end()) {
376 f.second->terminateJob(uuid);
381 SPD_TRACE(
"Removing job [{}]", uuid);
383 iJob->second =
nullptr;
386 SPD_TRACE(
"NodeManager::terminateJob print()");
398 SPD_DEBUG(
"Checking finished jobs [{}] to be removed ...",
mFinishedJobs.size());
400 std::chrono::time_point<std::chrono::system_clock> curTime = std::chrono::system_clock::now();
401 uint64_t curTimeEpoch = std::chrono::duration_cast<std::chrono::seconds>(curTime.time_since_epoch()).count();
402 std::vector<std::string> cleanUUID;
405 if (j ==
nullptr)
continue;
407 SPD_DEBUG(
"Terminating finished job. Time : diff[{}] timeout[{}]", curTimeEpoch - j->timeFinished(),
409 cleanUUID.push_back(js);
412 if (!cleanUUID.size())
return false;
414 for (
auto u : cleanUUID) {
426 if (
mJobs.size() == 0)
return;
428 std::vector<std::string> cleanUUID;
433 for (
auto job :
mJobs) cleanUUID.push_back(
job.first);
436 for (
auto u : cleanUUID) {
437 SPD_DEBUG(
"Terminating [{}]", u);
450 return search->second;
464 return search->second;
476 return search->second;
487 auto search =
mJobs.find(uuid);
488 if (search !=
mJobs.end()) {
489 return search->second;
534 for (
auto search :
mJobs) {
535 if (search.second !=
nullptr && search.second->feeder() == client_uuid) {
536 jobs.push_back(search.first);
548 num +=
feeder.second->nodeInfo()->slots();
574 if (
job.second->Job::size(Job::pending)) {
576 bool isActiveJob =
false;
578 if (pActiveJobUUID ==
job.first) {
599 auto found =
mJobs.find(jobUUID);
600 if (found !=
mJobs.end()) {
601 return found->second->haveMoreTasks();
631 if (
mJobs.size() == 0) {
632 SPD_TRACE(
"Publish id [{}] with zero tasks ",
id);
637 bool changed =
false;
642 if (!changed)
return;
646 job.second->
json(json[
"tasks"]);
649 Json::StreamWriterBuilder wBuilder;
650 wBuilder[
"indentation"] =
"";
651 std::string data = Json::writeString(wBuilder, json);
654 SPD_TRACE(
"Publish id [{}] data [{}] ",
id, data);
std::vector< std::string > mActiveJobs
List of active jobs.
bool removeTask(uint32_t id, EQueueType from)
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
void consumer(std::string uuid)
uint64_t mFinishedJobTimeout
Finished job timeout in seconds.
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
std::shared_ptr< Worker > worker(std::string uuid) const
bool terminateJob(Job *pJob)
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
virtual void terminateJob(std::string uuid)
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
virtual void addTaskSlot()
virtual bool haveMoreTasks()
virtual void noMoreTasks(std::string jobUUID)
bool isFinished()
Returns if jobs is finished.
bool changed() const
Returns if job info was changed.
int32_t nSlots(double mult=1.0) const
std::string uuid() const
returns UUID
virtual void resultTask(TaskInfo *task)
void feeder(std::string uuid)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
virtual std::string uuid() const =0
Returns node uuid.
Job * job(std::string uuid)
void print(std::string opt="") const
std::map< std::string, Job * > mJobs
List of jobs.
virtual Publisher * publisher() const
std::vector< std::string > mFinishedJobs
List of finished jobs.
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
void print(bool verbose=false) const
virtual void publish(std::string id) const
bool haveMoreTasks() const
Task statuses.
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
TaskPool * mpTaskPool
Task pool.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
void json(Json::Value &json)
Publisher * mpPublisher
Publisher.
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
virtual bool handleTaskPool(void *p)
TaskPool * taskPool()
Get NM's task pool.
virtual bool terminateFinishedJobs()
virtual void publish(std::string id, std::string data)=0
Publish TODO publish what?
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Base salsa TaskPool class.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
std::shared_ptr< Feeder > feeder(std::string uuid) const
size_t size(EQueueType t=all) const
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
virtual void terminateAllJobs(bool finishedonly=false)
std::shared_ptr< Consumer > consumer(std::string uuid) const
virtual void resultTaskToExternal(Job *, TaskInfo *)
Handle return of task and send it to external client.