ndmspc  v1.2.0-0.1.rc7
NTaskStateManager.cxx
1 #include "NTaskStateManager.h"
2 #include <stdexcept>
3 
4 namespace Ndmspc {
5 
6 void NTaskStateManager::AddPending(TaskId id, const TaskPayload & payload)
7 {
8  if (TaskExists(id)) {
9  throw std::runtime_error("Task " + std::to_string(id) + " already exists in state manager");
10  }
11  EnqueuePending(id, payload);
12  fTaskPayloads[id] = payload;
13 }
14 
15 bool NTaskStateManager::AssignToWorker(const WorkerId & worker, TaskId id)
16 {
17  if (fPending.empty() || fPendingIds.find(id) == fPendingIds.end()) {
18  return false;
19  }
20 
21  // Find and remove from pending
22  std::queue<std::pair<TaskId, TaskPayload>> tempQueue;
23  bool found = false;
24 
25  while (!fPending.empty()) {
26  auto [taskId, payload] = std::move(fPending.front());
27  fPending.pop();
28 
29  if (taskId == id && !found) {
30  found = true;
31  // Move to running state
32  fRunning[id] = payload;
33  fWorkerToTasks[worker].insert(id);
34  fTaskToWorker[id] = worker;
35  fPendingIds.erase(id);
36  } else {
37  tempQueue.emplace(taskId, payload);
38  }
39  }
40 
41  fPending = std::move(tempQueue);
42  return found;
43 }
44 
45 bool NTaskStateManager::ClaimNextPendingForWorker(const WorkerId & worker, TaskId & id, TaskPayload & payload)
46 {
47  if (fPending.empty()) {
48  return false;
49  }
50 
51  auto item = std::move(fPending.front());
52  fPending.pop();
53  fPendingIds.erase(item.first);
54 
55  id = item.first;
56  payload = std::move(item.second);
57 
58  fRunning[id] = payload;
59  fWorkerToTasks[worker].insert(id);
60  fTaskToWorker[id] = worker;
61  return true;
62 }
63 
65 {
66  auto it = fRunning.find(id);
67  if (it == fRunning.end()) {
68  return false; // Task not running
69  }
70 
71  // Move from running to done
72  fDone.insert(id);
73  fRunning.erase(it);
74 
75  // Remove from worker assignment
76  auto workerIt = fTaskToWorker.find(id);
77  if (workerIt != fTaskToWorker.end()) {
78  const WorkerId & worker = workerIt->second;
79  fWorkerToTasks[worker].erase(id);
80  if (fWorkerToTasks[worker].empty()) {
81  fWorkerToTasks.erase(worker);
82  }
83  fTaskToWorker.erase(workerIt);
84  }
85 
86  return true;
87 }
88 
90 {
91  auto it = fRunning.find(id);
92  if (it == fRunning.end()) {
93  return false; // Task not running
94  }
95 
96  // Move from running back to pending
97  TaskPayload payload = it->second;
98  fRunning.erase(it);
99  EnqueuePending(id, payload);
100 
101  // Remove from worker assignment
102  auto workerIt = fTaskToWorker.find(id);
103  if (workerIt != fTaskToWorker.end()) {
104  const WorkerId & worker = workerIt->second;
105  fWorkerToTasks[worker].erase(id);
106  if (fWorkerToTasks[worker].empty()) {
107  fWorkerToTasks.erase(worker);
108  }
109  fTaskToWorker.erase(workerIt);
110  }
111 
112  return true;
113 }
114 
116 {
117  // Running -> pending
118  auto runningIt = fRunning.find(id);
119  if (runningIt != fRunning.end()) {
120  TaskPayload payload = runningIt->second;
121  fRunning.erase(runningIt);
122  EnqueuePending(id, payload);
123 
124  auto workerIt = fTaskToWorker.find(id);
125  if (workerIt != fTaskToWorker.end()) {
126  const WorkerId & worker = workerIt->second;
127  auto wkIt = fWorkerToTasks.find(worker);
128  if (wkIt != fWorkerToTasks.end()) {
129  wkIt->second.erase(id);
130  if (wkIt->second.empty()) {
131  fWorkerToTasks.erase(wkIt);
132  }
133  }
134  fTaskToWorker.erase(workerIt);
135  }
136  return true;
137  }
138 
139  // Done -> pending
140  auto doneIt = fDone.find(id);
141  if (doneIt != fDone.end()) {
142  auto payloadIt = fTaskPayloads.find(id);
143  if (payloadIt == fTaskPayloads.end()) {
144  return false;
145  }
146  fDone.erase(doneIt);
147  EnqueuePending(id, payloadIt->second);
148  return true;
149  }
150 
151  // Unknown or already pending.
152  return false;
153 }
154 
155 std::set<NTaskStateManager::TaskId> NTaskStateManager::GetWorkerTasks(const WorkerId & worker) const
156 {
157  auto it = fWorkerToTasks.find(worker);
158  if (it != fWorkerToTasks.end()) {
159  return it->second;
160  }
161  return {};
162 }
163 
164 std::pair<NTaskStateManager::TaskId, NTaskStateManager::TaskPayload> NTaskStateManager::GetNextPending()
165 {
166  if (fPending.empty()) {
167  return {0, {}};
168  }
169 
170  auto [taskId, payload] = fPending.front();
171  return {taskId, payload};
172 }
173 
175 {
176  return !fPending.empty();
177 }
178 
179 std::vector<std::pair<NTaskStateManager::TaskId, NTaskStateManager::TaskPayload>>
181 {
182  std::vector<std::pair<TaskId, TaskPayload>> recovered;
183 
184  auto workerIt = fWorkerToTasks.find(worker);
185  if (workerIt == fWorkerToTasks.end()) {
186  return recovered; // No tasks assigned to this worker
187  }
188 
189  const auto & taskIds = workerIt->second;
190  for (TaskId id : taskIds) {
191  auto runningIt = fRunning.find(id);
192  if (runningIt != fRunning.end()) {
193  // Move back to pending
194  const TaskPayload & payload = runningIt->second;
195  recovered.emplace_back(id, payload);
196  EnqueuePending(id, payload);
197  fRunning.erase(runningIt);
198  }
199  fTaskToWorker.erase(id);
200  }
201 
202  fWorkerToTasks.erase(workerIt);
203  return recovered;
204 }
205 
206 bool NTaskStateManager::RemoveTaskFromWorker(const WorkerId & worker, TaskId id)
207 {
208  auto workerIt = fWorkerToTasks.find(worker);
209  if (workerIt == fWorkerToTasks.end()) {
210  return false;
211  }
212 
213  size_t erased = workerIt->second.erase(id);
214  if (erased > 0) {
215  if (workerIt->second.empty()) {
216  fWorkerToTasks.erase(workerIt);
217  }
218  fTaskToWorker.erase(id);
219  return true;
220  }
221 
222  return false;
223 }
224 
225 NTaskStateManager::WorkerId NTaskStateManager::GetTaskWorker(TaskId id) const
226 {
227  auto it = fTaskToWorker.find(id);
228  if (it != fTaskToWorker.end()) {
229  return it->second;
230  }
231  return {};
232 }
233 
234 bool NTaskStateManager::IsDone(TaskId id) const
235 {
236  return fDone.find(id) != fDone.end();
237 }
238 
240 {
241  fPending = std::queue<std::pair<TaskId, TaskPayload>>();
242  fPendingIds.clear();
243  fRunning.clear();
244  fDone.clear();
245  fWorkerToTasks.clear();
246  fTaskToWorker.clear();
247  fTaskPayloads.clear();
248 }
249 
250 bool NTaskStateManager::TaskExists(TaskId id) const
251 {
252  return fPendingIds.find(id) != fPendingIds.end() ||
253  fRunning.find(id) != fRunning.end() ||
254  fDone.find(id) != fDone.end() ||
255  fTaskPayloads.find(id) != fTaskPayloads.end();
256 }
257 
258 void NTaskStateManager::EnqueuePending(TaskId id, const TaskPayload & payload)
259 {
260  if (fPendingIds.insert(id).second) {
261  fPending.emplace(id, payload);
262  }
263 }
264 
265 } // namespace Ndmspc
std::pair< TaskId, TaskPayload > GetNextPending()
Get the next pending task for dispatch.
void Clear()
Clear all state (for reuse or cleanup)
bool IsDone(TaskId id) const
Check if a task has been completed.
WorkerId GetTaskWorker(TaskId id) const
Get the worker currently executing a task.
bool AssignToWorker(const WorkerId &worker, TaskId id)
Assign a pending task to a worker (transitions to running)
std::vector< std::pair< TaskId, TaskPayload > > RecoverWorkerTasks(const WorkerId &worker)
Recover all tasks from a failed worker.
bool HasPending() const
Check if there are pending tasks.
bool ClaimNextPendingForWorker(const WorkerId &worker, TaskId &id, TaskPayload &payload)
Atomically pop the next pending task and assign it to a worker.
bool RequeueTask(TaskId id)
Requeue a task to pending state from running or done state.
std::set< TaskId > GetWorkerTasks(const WorkerId &worker) const
Get all tasks currently assigned to a worker.
void AddPending(TaskId id, const TaskPayload &payload)
Add a new task to the pending queue.
bool TaskExists(TaskId id) const
Check if task exists in any state.
bool MarkFailed(TaskId id)
Mark a running task as failed (returns to pending for redistribution)
bool MarkDone(TaskId id)
Mark a running task as completed.
bool RemoveTaskFromWorker(const WorkerId &worker, TaskId id)
Remove a specific task ID from a worker (e.g., after ACK but worker later fails)
void EnqueuePending(TaskId id, const TaskPayload &payload)
Push task into pending queue and pending-id index.