ndmspc  v1.2.0-0.1.rc7
NTaskStateManager.h
1 #ifndef N_TASK_STATE_MANAGER_H
2 #define N_TASK_STATE_MANAGER_H
3 
4 #include <queue>
5 #include <set>
6 #include <unordered_map>
7 #include <unordered_set>
8 #include <vector>
9 #include <cstddef>
10 #include <string>
11 #include <utility>
12 
13 namespace Ndmspc {
14 
23  public:
24  using TaskId = size_t;
25  using WorkerId = std::string;
26  using TaskPayload = std::vector<int>; // coordinates
27 
28  NTaskStateManager() = default;
29  ~NTaskStateManager() = default;
30 
36  void AddPending(TaskId id, const TaskPayload & payload);
37 
44  bool AssignToWorker(const WorkerId & worker, TaskId id);
45 
53  bool ClaimNextPendingForWorker(const WorkerId & worker, TaskId & id, TaskPayload & payload);
54 
60  bool MarkDone(TaskId id);
61 
67  bool MarkFailed(TaskId id);
68 
74  bool RequeueTask(TaskId id);
75 
81  std::set<TaskId> GetWorkerTasks(const WorkerId & worker) const;
82 
88  std::pair<TaskId, TaskPayload> GetNextPending();
89 
94  bool HasPending() const;
95 
102  std::vector<std::pair<TaskId, TaskPayload>> RecoverWorkerTasks(const WorkerId & worker);
103 
110  bool RemoveTaskFromWorker(const WorkerId & worker, TaskId id);
111 
117  WorkerId GetTaskWorker(TaskId id) const;
118 
124  bool IsDone(TaskId id) const;
125 
127  size_t PendingCount() const { return fPending.size(); }
128 
130  size_t RunningCount() const { return fRunning.size(); }
131 
133  size_t DoneCount() const { return fDone.size(); }
134 
136  size_t TotalCount() const { return PendingCount() + RunningCount() + DoneCount(); }
137 
139  void Clear();
140 
141  private:
142  // State buckets: tasks flow pending → running → done
143  std::queue<std::pair<TaskId, TaskPayload>> fPending; // Not yet dispatched
144  std::unordered_set<TaskId> fPendingIds;
145  std::unordered_map<TaskId, TaskPayload> fRunning; // Assigned to workers
146  std::set<TaskId> fDone; // Successfully completed
147 
148  // Mappings for efficient lookup
149  std::unordered_map<WorkerId, std::set<TaskId>> fWorkerToTasks; // Current assignments
150  std::unordered_map<TaskId, WorkerId> fTaskToWorker; // Reverse mapping
151  std::unordered_map<TaskId, TaskPayload> fTaskPayloads; // All payloads (for recovery)
152 
154  bool TaskExists(TaskId id) const;
155 
157  void EnqueuePending(TaskId id, const TaskPayload & payload);
158 };
159 
160 } // namespace Ndmspc
161 
162 #endif // N_TASK_STATE_MANAGER_H
Manages task lifecycle: pending → running → done/failed.
std::pair< TaskId, TaskPayload > GetNextPending()
Get the next pending task for dispatch.
void Clear()
Clear all state (for reuse or cleanup)
bool IsDone(TaskId id) const
Check if a task has been completed.
size_t DoneCount() const
Get number of completed tasks.
WorkerId GetTaskWorker(TaskId id) const
Get the worker currently executing a task.
bool AssignToWorker(const WorkerId &worker, TaskId id)
Assign a pending task to a worker (transitions to running)
std::vector< std::pair< TaskId, TaskPayload > > RecoverWorkerTasks(const WorkerId &worker)
Recover all tasks from a failed worker.
bool HasPending() const
Check if there are pending tasks.
bool ClaimNextPendingForWorker(const WorkerId &worker, TaskId &id, TaskPayload &payload)
Atomically pop the next pending task and assign it to a worker.
bool RequeueTask(TaskId id)
Requeue a task to pending state from running or done state.
std::set< TaskId > GetWorkerTasks(const WorkerId &worker) const
Get all tasks currently assigned to a worker.
void AddPending(TaskId id, const TaskPayload &payload)
Add a new task to the pending queue.
size_t RunningCount() const
Get number of running tasks.
bool TaskExists(TaskId id) const
Check if task exists in any state.
bool MarkFailed(TaskId id)
Mark a running task as failed (returns to pending for redistribution)
size_t TotalCount() const
Get total tasks tracked (pending + running + done)
size_t PendingCount() const
Get number of pending tasks.
bool MarkDone(TaskId id)
Mark a running task as completed.
bool RemoveTaskFromWorker(const WorkerId &worker, TaskId id)
Remove a specific task ID from a worker (e.g., after ACK but worker later fails)
void EnqueuePending(TaskId id, const TaskPayload &payload)
Push task into pending queue and pending-id index.