salsa 0.7.1
Loading...
Searching...
No Matches
TaskPool.cc
1#include "TaskPool.hh"
2#include "Job.hh"
3#include "NodeManager.hh"
4#include "TaskExecutor.hh"
5namespace Salsa {
6
7TaskPool::TaskPool(NodeManager * pNodeManager) : Object(), mpNodeManager(pNodeManager)
8{
12}
14{
18 for (auto task : mTasks) {
19 delete task.second;
20 }
21 mTasks.clear();
22}
23
24void TaskPool::add(void * pPointer, TaskState * pTaskState)
25{
29 if (pPointer == nullptr) return;
30
31 mTasks.insert(std::make_pair(pPointer, pTaskState));
32 pTaskState->id(mTasks.size());
33}
34
35TaskState * TaskPool::find(void * pPointer) const
36{
41 auto found = mTasks.find(pPointer);
42 if (found != mTasks.end()) {
43 return found->second;
44 }
45 return nullptr;
46}
47
48TaskState * TaskPool::findById(uint32_t id) const
49{
53
54 for (auto task : mTasks) {
55 if (task.second->id() == id) return task.second;
56 }
57
58 return nullptr;
59}
60
62{
66
67 for (auto task : mTasks) {
68 if (task.second->state() == TaskState::idle) return task.second;
69 }
70
71 return nullptr;
72}
73
75{
79 uint32_t nFreeSlots = 0;
80 for (auto task : mTasks) {
81 if (task.second->state() == TaskState::idle) nFreeSlots++;
82 }
83
84 return nFreeSlots;
85}
86
88{
92 if (id == 0) return;
93
94 for (auto task : mTasks) {
95 if (task.second->id() == id) task.second->state(state);
96 }
97}
98
100{
104
105 if (mTasks.size() == 0) return true;
106
107 pJob->print();
108 std::vector<TaskInfo *> runningTasks;
109 pJob->tasks(runningTasks, Job::EQueueType::running);
110
111 for (auto runningTask : runningTasks) {
112 for (auto task : mTasks) {
113 if (task.second->state() == TaskState::killed) {
114 continue;
115 }
116 else if (task.second->task() == runningTask) {
117 task.second->killTask();
118 break;
119 }
120 }
121 }
122 // job->tasks(v, Job::EQueueType::pending);
123 // job->tasks(v, Job::EQueueType::assigned);
124
125 return false;
126}
127
128void TaskPool::print(bool verbose) const
129{
133 uint32_t stat[TaskState::all] = {};
134
135 for (auto task : mTasks) {
136 stat[task.second->state()]++;
137 task.second->print(verbose);
138 }
139 SPD_DEBUG("TaskPool I[{}] A[{}] R[{}]", stat[TaskState::idle], stat[TaskState::assigned], stat[TaskState::running]);
140}
141
142bool TaskPool::handlePipe(void * pPipe)
143{
147
148 TaskState * pTaskState = find(pPipe);
149 if (pTaskState == nullptr) {
150 SPD_ERROR("pTaskState by actor [{}] is null!!!", static_cast<void *>(pPipe));
151 return false;
152 }
153 if (pTaskState->executor() == nullptr) {
154 SPD_ERROR("pTaskState->executor() by actor [{}] is null!!!", static_cast<void *>(pPipe));
155 return false;
156 }
157 if (pTaskState->executor()->pipe() == nullptr) {
158 SPD_ERROR("pTaskState->executor()->pipe() by actor [{}] is null!!!", static_cast<void *>(pPipe));
159 return false;
160 }
161
162 std::vector<std::string> extra;
163 pTaskState->executor()->handlePipe(extra);
164 TaskState::EState state = pTaskState->state();
165 if (state == TaskState::assigned) {
166 pTaskState->state(TaskState::running);
167 // handle mJobs
168 // Nothing for now i think
169 }
170 else if (state == TaskState::running || state == TaskState::killed) {
171 pTaskState->state(TaskState::idle);
172 pTaskState->pid(0);
173
174 std::string wkUUID = extra[0];
175 std::string upstream = extra[1];
176 // handle mJobs
177
178 Job * pJob = mpNodeManager->job(pTaskState->task()->jobid());
179 if (pJob != nullptr) {
180 SPD_TRACE("TASK ENDED JOB [{}:{}]", pTaskState->task()->jobid(), pTaskState->task()->taskid());
181 pJob->removeTask(pTaskState->task()->taskid(), Salsa::Job::running);
182 }
183 std::vector<std::string> out;
184 out.push_back("TASK_RESULT");
185 std::string payload;
186 pTaskState->task()->SerializeToString(&payload);
187 out.push_back(payload);
188 pTaskState = findFreeTask();
189 if (pTaskState && pTaskState->id() > 0) {
190 SPD_TRACE("AFTER TASK_RESULT sending reserving task [{}]", pTaskState->id());
191 out.push_back("&");
192 out.push_back("FREESLOT");
193 out.push_back(fmt::format("{}", pTaskState->id()));
194 pTaskState->state(TaskState::assigned);
195 }
196
197 SPD_TRACE("Searching to worker [{}]", wkUUID);
198 std::shared_ptr<Salsa::Worker> pWorker = mpNodeManager->worker(wkUUID);
199 if (pWorker) {
200 SPD_TRACE("Sending via pWorker [{}] to feeder [{}]", wkUUID, upstream);
201 mpNodeManager->sendWhisper(pWorker->pipe().get(), upstream, out);
202 }
203 }
204 print();
205
206 return true;
207}
208
209} // namespace Salsa
Job class.
Definition Job.hh:16
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition Job.cc:130
void print() const
Definition Job.cc:162
bool removeTask(uint32_t id, EQueueType from)
Definition Job.cc:96
NodeManager class.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
std::shared_ptr< Worker > worker(std::string uuid) const
Job * job(std::string uuid)
Base Salsa Object class.
Definition Object.hh:15
virtual void * pipe() const
virtual bool handlePipe(std::vector< std::string > &)=0
Handle pipe.
TaskState * find(void *p) const
Definition TaskPool.cc:35
TaskPool(NodeManager *pNM)
Definition TaskPool.cc:7
void print(bool verbose=false) const
Definition TaskPool.cc:128
NodeManager * mpNodeManager
Node manager.
Definition TaskPool.hh:38
void changeState(uint32_t id, TaskState::EState state)
Definition TaskPool.cc:87
uint32_t nSlotFree()
Definition TaskPool.cc:74
void add(void *p, TaskState *t)
Definition TaskPool.cc:24
TaskState * findFreeTask() const
Definition TaskPool.cc:61
std::map< void *, TaskState * > mTasks
List of task slots.
Definition TaskPool.hh:37
bool handlePipe(void *pPipe)
Definition TaskPool.cc:142
bool terminateJob(Job *pJob)
Definition TaskPool.cc:99
virtual ~TaskPool()
Definition TaskPool.cc:13
TaskState * findById(uint32_t id) const
Definition TaskPool.cc:48
Base salsa TaskState class.
Definition TaskState.hh:16
TaskExecutor * executor()
Definition TaskState.cc:83
void state(EState s)
Definition TaskState.cc:36
TaskInfo * task() const
Definition TaskState.cc:66
void id(uint32_t id)
Definition TaskState.cc:21
void pid(uint32_t pid)
Definition TaskState.cc:51
EState
Status of task.
Definition TaskState.hh:19