3 #include <NodeManager.hh>
4 #include <TaskExecutor.hh>
29 if (pPointer ==
nullptr)
return;
31 mTasks.insert(std::make_pair(pPointer, pTaskState));
41 auto found =
mTasks.find(pPointer);
42 if (found !=
mTasks.end()) {
55 if (task.second->id() == id)
return task.second;
68 if (task.second->state() == TaskState::idle)
return task.second;
79 uint32_t nFreeSlots = 0;
81 if (task.second->state() == TaskState::idle) nFreeSlots++;
95 if (task.second->id() == id) task.second->state(state);
105 if (
mTasks.size() == 0)
return true;
108 std::vector<TaskInfo *> runningTasks;
109 pJob->
tasks(runningTasks, Job::QueueType::running);
111 for (
auto runningTask : runningTasks) {
112 for (
auto task :
mTasks) {
113 if (task.second->state() == TaskState::killed) {
116 else if (task.second->task() == runningTask) {
117 task.second->killTask();
133 uint32_t stat[TaskState::all] = {};
135 for (
auto task :
mTasks) {
136 stat[task.second->state()]++;
137 task.second->print(verbose);
139 SPD_INFO(
"TaskPool I[{}] A[{}] R[{}]", stat[TaskState::idle], stat[TaskState::assigned], stat[TaskState::running]);
149 if (pTaskState ==
nullptr) {
150 SPD_ERROR(
"pTaskState by actor [{}] is null!!!", static_cast<void *>(pPipe));
153 if (pTaskState->
executor() ==
nullptr) {
154 SPD_ERROR(
"pTaskState->executor() by actor [{}] is null!!!", static_cast<void *>(pPipe));
158 SPD_ERROR(
"pTaskState->executor()->pipe() by actor [{}] is null!!!", static_cast<void *>(pPipe));
162 std::vector<std::string> extra;
165 if (state == TaskState::assigned) {
166 pTaskState->
state(TaskState::running);
170 else if (state == TaskState::running || state == TaskState::killed) {
171 pTaskState->
state(TaskState::idle);
174 std::string wkUUID = extra[0];
175 std::string upstream = extra[1];
179 if (pJob !=
nullptr) {
180 SPD_TRACE(
"TASK ENDED JOB [{}:{}]", pTaskState->
task()->jobid(), pTaskState->
task()->taskid());
181 pJob->
removeTask(pTaskState->
task()->taskid(), Salsa::Job::running);
183 std::vector<std::string> out;
184 out.push_back(
"TASK_RESULT");
186 pTaskState->
task()->SerializeToString(&payload);
187 out.push_back(payload);
189 if (pTaskState && pTaskState->
id() > 0) {
190 SPD_TRACE(
"AFTER TASK_RESULT sending reserving task [{}]", pTaskState->
id());
192 out.push_back(
"FREESLOT");
193 out.push_back(fmt::format(
"{}", pTaskState->
id()));
194 pTaskState->
state(TaskState::assigned);
197 SPD_TRACE(
"Searching to worker [{}]", wkUUID);
200 SPD_TRACE(
"Sending via pWorker [{}] to feeder [{}]", wkUUID, upstream);
std::shared_ptr< Worker > worker(std::string uuid) const
bool handlePipe(void *pPipe)
TaskState * find(void *p) const
Base salsa TaskState class.
NodeManager * mpNodeManager
Node manager.
TaskPool(NodeManager *pNM)
bool terminateJob(Job *pJob)
virtual bool handlePipe(std::vector< std::string > &)=0
Handle pipe.
TaskState * findFreeTask() const
void add(void *p, TaskState *t)
Job * job(std::string uuid)
void tasks(std::vector< TaskInfo * > &v, QueueType type, bool clear=true)
TaskExecutor * executor()
virtual void * pipe() const
void changeState(uint32_t id, TaskState::State state)
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
bool removeTask(uint32_t id, QueueType from)
void print(bool verbose=false) const
std::map< void *, TaskState * > mTasks
List of task slots.
TaskState * findById(uint32_t id) const